Skip to content

Commit

Permalink
plugins: Include log drop count in the status plugin's metrics
Browse files Browse the repository at this point in the history
This change adds the count of the decision log events that
were dropped when the rate limit was exceeded to the status
plugin's metrics provider. These metrics are part of the periodic
status update and hence should allow control planes to monitor the
number of dropped log events.

Signed-off-by: Ashutosh Narkar <anarkar4387@gmail.com>
  • Loading branch information
ashutosh-narkar committed Apr 2, 2021
1 parent 8b2e9ff commit 489c581
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 5 deletions.
6 changes: 3 additions & 3 deletions plugins/discovery/discovery.go
Expand Up @@ -370,7 +370,7 @@ func getPluginSet(factories map[string]plugins.Factory, manager *plugins.Manager
}

if decisionLogsConfig != nil {
p, created := getDecisionLogsPlugin(manager, decisionLogsConfig)
p, created := getDecisionLogsPlugin(manager, decisionLogsConfig, m)
if created {
starts = append(starts, p)
} else if p != nil {
Expand Down Expand Up @@ -405,10 +405,10 @@ func getBundlePlugin(m *plugins.Manager, config *bundle.Config) (plugin *bundle.
return plugin, created
}

func getDecisionLogsPlugin(m *plugins.Manager, config *logs.Config) (plugin *logs.Plugin, created bool) {
func getDecisionLogsPlugin(m *plugins.Manager, config *logs.Config, metrics metrics.Metrics) (plugin *logs.Plugin, created bool) {
plugin = logs.Lookup(m)
if plugin == nil {
plugin = logs.New(config, m)
plugin = logs.New(config, m).WithMetrics(metrics)
m.Register(logs.Name, plugin)
created = true
}
Expand Down
127 changes: 127 additions & 0 deletions plugins/discovery/discovery_test.go
Expand Up @@ -16,12 +16,18 @@ import (
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"

"github.com/open-policy-agent/opa/ast"
bundleApi "github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/download"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/plugins/bundle"
"github.com/open-policy-agent/opa/plugins/logs"
"github.com/open-policy-agent/opa/plugins/status"
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/topdown/cache"
"github.com/open-policy-agent/opa/util"
Expand Down Expand Up @@ -991,6 +997,127 @@ func TestStatusUpdatesTimestamp(t *testing.T) {
}
}

func TestStatusMetricsForLogDrops(t *testing.T) {

ts := testServer{t: t}
ts.Start()
defer ts.Stop()

logLevel := logrus.GetLevel()
defer logrus.SetLevel(logLevel)

// Ensure that status messages are printed to console even with the standard logger configured to log errors only
logrus.SetLevel(logrus.ErrorLevel)

hook := test.NewLocal(plugins.GetConsoleLogger())

ctx := context.Background()

manager, err := plugins.New([]byte(fmt.Sprintf(`{
"services": {
"localhost": {
"url": %q
}
},
"discovery": {"name": "config"}
}`, ts.server.URL)), "test-id", inmem.New())
if err != nil {
t.Fatal(err)
}

initialBundle := makeDataBundle(1, `
{
"config": {
"status": {"console": true},
"decision_logs": {
"service": "localhost",
"reporting": {
"max_decisions_per_second": 1
}
}
}
}
`)

disco, err := New(manager, Metrics(metrics.New()))
if err != nil {
t.Fatal(err)
}

ps, err := disco.processBundle(ctx, initialBundle)
if err != nil {
t.Fatal(err)
}

// start the decision log and status plugins
for _, p := range ps.Start {
if err := p.Start(ctx); err != nil {
t.Fatal(err)
}
}

plugin := logs.Lookup(manager)
if plugin == nil {
t.Fatal("Expected decision log plugin registered on manager")
}

var input interface{} = map[string]interface{}{"method": "GET"}
var result interface{} = false

event1 := &server.Info{
DecisionID: "abc",
Path: "foo/bar",
Input: &input,
Results: &result,
RemoteAddr: "test-1",
}

event2 := &server.Info{
DecisionID: "def",
Path: "foo/baz",
Input: &input,
Results: &result,
RemoteAddr: "test-2",
}

event3 := &server.Info{
DecisionID: "ghi",
Path: "foo/aux",
Input: &input,
Results: &result,
RemoteAddr: "test-3",
}

_ = plugin.Log(ctx, event1) // event 1 should be written into the decision log encoder
_ = plugin.Log(ctx, event2) // event 2 should not be written into the decision log encoder as rate limit exceeded
_ = plugin.Log(ctx, event3) // event 3 should not be written into the decision log encoder as rate limit exceeded

// trigger a status update
disco.oneShot(ctx, download.Update{ETag: "etag-1", Bundle: makeDataBundle(1, `{
"config": {
"bundle": {"name": "test1"}
}
}`)})

entries := hook.AllEntries()
if len(entries) == 0 {
t.Fatal("Expected log entries but got none")
}

// Pick the last entry as it should have the drop count
e := entries[len(entries)-1]

if _, ok := e.Data["metrics"]; !ok {
t.Fatal("Expected metrics")
}

exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_dropped": json.Number("2")}}

if !reflect.DeepEqual(e.Data["metrics"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Data["metrics"])
}
}

func makeDataBundle(n int, s string) *bundleApi.Bundle {
return &bundleApi.Bundle{
Manifest: bundleApi.Manifest{Revision: fmt.Sprintf("test-revision-%v", n)},
Expand Down
17 changes: 15 additions & 2 deletions plugins/logs/plugin.go
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"fmt"

"math"
"math/rand"
"net/http"
Expand All @@ -17,17 +18,17 @@ import (
"sync"
"time"

"github.com/open-policy-agent/opa/sdk"

"github.com/pkg/errors"

"golang.org/x/time/rate"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/internal/ref"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/plugins/rest"
"github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/sdk"
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/util"
Expand Down Expand Up @@ -214,6 +215,7 @@ const (
defaultUploadSizeLimitBytes = int64(32768) // 32KB limit
defaultBufferSizeLimitBytes = int64(0) // unlimited
defaultMaskDecisionPath = "/system/log/mask"
logDropCounterName = "decision_logs_dropped"
)

// ReportingConfig represents configuration for the plugin's reporting behaviour.
Expand Down Expand Up @@ -345,6 +347,7 @@ type Plugin struct {
maskMutex sync.Mutex
logger sdk.Logger
limiter *rate.Limiter
metrics metrics.Metrics
}

type reconfigure struct {
Expand Down Expand Up @@ -396,6 +399,12 @@ func New(parsedConfig *Config, manager *plugins.Manager) *Plugin {
return plugin
}

// WithMetrics sets the global metrics provider to be used by the plugin.
func (p *Plugin) WithMetrics(m metrics.Metrics) *Plugin {
p.metrics = m
return p
}

// Name identifies the plugin on manager.
const Name = "decision_logs"

Expand Down Expand Up @@ -668,6 +677,10 @@ func (p *Plugin) reconfigure(config interface{}) {
func (p *Plugin) encodeAndBufferEvent(event EventV1) {
if p.limiter != nil {
if !p.limiter.Allow() {
if p.metrics != nil {
p.metrics.Counter(logDropCounterName).Incr()
}

p.logger.Error("Decision log dropped as rate limit exceeded. Reduce reporting interval or increase rate limit.")
return
}
Expand Down
126 changes: 126 additions & 0 deletions plugins/logs/plugin_test.go
Expand Up @@ -22,13 +22,17 @@ import (
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/plugins"
"github.com/open-policy-agent/opa/plugins/bundle"
"github.com/open-policy-agent/opa/plugins/status"
"github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/topdown"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -738,6 +742,113 @@ func TestPluginRateLimitRequeue(t *testing.T) {
}
}

func TestPluginRateLimitDropCountStatus(t *testing.T) {
ctx := context.Background()

ts, err := time.Parse(time.RFC3339Nano, "2018-01-01T12:00:00.123456Z")
if err != nil {
panic(err)
}

numDecisions := 1 // 1 decision per second

fixture := newTestFixture(t, func(c *Config) {
limit := float64(numDecisions)
c.Reporting.MaxDecisionsPerSecond = &limit
}, func(c *Config) {
limit := int64(300)
c.Reporting.UploadSizeLimitBytes = &limit
})
defer fixture.server.stop()

fixture.plugin.metrics = metrics.New()

var input interface{} = map[string]interface{}{"method": "GET"}
var result interface{} = false

event1 := &server.Info{
DecisionID: "abc",
Path: "foo/bar",
Input: &input,
Results: &result,
RemoteAddr: "test-1",
Timestamp: ts,
}

event2 := &server.Info{
DecisionID: "def",
Path: "foo/baz",
Input: &input,
Results: &result,
RemoteAddr: "test-2",
Timestamp: ts,
}

event3 := &server.Info{
DecisionID: "ghi",
Path: "foo/aux",
Input: &input,
Results: &result,
RemoteAddr: "test-3",
Timestamp: ts,
}

_ = fixture.plugin.Log(ctx, event1) // event 1 should be written into the encoder

if fixture.plugin.enc.bytesWritten == 0 {
t.Fatal("Expected event to be written into the encoder")
}

// Create a status plugin that logs to console
pluginConfig := []byte(fmt.Sprintf(`{
"console": true,
}`))

config, _ := status.ParseConfig(pluginConfig, fixture.manager.Services())
p := status.New(config, fixture.manager).WithMetrics(fixture.plugin.metrics)

fixture.manager.Register(status.Name, p)
if err := fixture.manager.Start(ctx); err != nil {
panic(err)
}

logLevel := logrus.GetLevel()
defer logrus.SetLevel(logLevel)

// Ensure that status messages are printed to console even with the standard logger configured to log errors only
logrus.SetLevel(logrus.ErrorLevel)

hook := test.NewLocal(plugins.GetConsoleLogger())

_ = fixture.plugin.Log(ctx, event2) // event 2 should not be written into the encoder as rate limit exceeded
_ = fixture.plugin.Log(ctx, event3) // event 3 should not be written into the encoder as rate limit exceeded

// Trigger a status update
status := testStatus()
p.UpdateDiscoveryStatus(*status)

// Give the logger / console some time to process and print the events
time.Sleep(10 * time.Millisecond)

entries := hook.AllEntries()
if len(entries) == 0 {
t.Fatal("Expected log entries but got none")
}

// Pick the last entry as it should have the drop count
e := entries[len(entries)-1]

if _, ok := e.Data["metrics"]; !ok {
t.Fatal("Expected metrics")
}

exp := map[string]interface{}{"<built-in>": map[string]interface{}{"counter_decision_logs_dropped": json.Number("2")}}

if !reflect.DeepEqual(e.Data["metrics"], exp) {
t.Fatalf("Expected %v but got %v", exp, e.Data["metrics"])
}
}

func TestPluginRateLimitBadConfig(t *testing.T) {
manager, _ := plugins.New(nil, "test-instance-id", inmem.New())

Expand Down Expand Up @@ -1581,3 +1692,18 @@ func compareLogEvent(t *testing.T, actual []byte, exp EventV1) {
t.Fatalf("Expected %+v but got %+v", exp, events[0])
}
}

func testStatus() *bundle.Status {

tDownload, _ := time.Parse("2018-01-01T00:00:00.0000000Z", time.RFC3339Nano)
tActivate, _ := time.Parse("2018-01-01T00:00:01.0000000Z", time.RFC3339Nano)

status := bundle.Status{
Name: "example/authz",
ActiveRevision: "quickbrawnfaux",
LastSuccessfulDownload: tDownload,
LastSuccessfulActivation: tActivate,
}

return &status
}

0 comments on commit 489c581

Please sign in to comment.