Skip to content

Commit

Permalink
tracing: make otel dependency optional for rego+topdown (open-policy-…
Browse files Browse the repository at this point in the history
…agent#4127)

This follows the same approach as the wasm feature: by default, importers
of

    github.com/open-policy-agent/opa/rego
    github.com/open-policy-agent/opa/topdown

will not get a transitive dependency on the otel libraries.

In terms of functionality, nothing changes for the server and runtime.

Signed-off-by: Stephan Renatus <stephan.renatus@gmail.com>
  • Loading branch information
srenatus authored and yzeng25 committed Dec 14, 2021
1 parent 112802c commit de9a48f
Show file tree
Hide file tree
Showing 17 changed files with 252 additions and 104 deletions.
1 change: 1 addition & 0 deletions cmd/features.go
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

//go:build opa_wasm
// +build opa_wasm

package cmd
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Expand Up @@ -282,7 +282,7 @@ func initRuntime(ctx context.Context, params runCmdParams, args []string) (*runt
return nil, err
}

rt.SetDistributedTracingErrorHandler()
rt.SetDistributedTracingLogging()

return rt, nil
}
Expand Down
34 changes: 34 additions & 0 deletions features/tracing/tracing.go
@@ -0,0 +1,34 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package tracing

import (
"net/http"

pkg_tracing "github.com/open-policy-agent/opa/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func init() {
pkg_tracing.RegisterHTTPTracing(&factory{})
}

type factory struct{}

func (*factory) NewTransport(tr http.RoundTripper, opts pkg_tracing.Options) http.RoundTripper {
return otelhttp.NewTransport(tr, convertOpts(opts)...)
}

func (*factory) NewHandler(f http.Handler, label string, opts pkg_tracing.Options) http.Handler {
return otelhttp.NewHandler(f, label, convertOpts(opts)...)
}

func convertOpts(opts pkg_tracing.Options) []otelhttp.Option {
otelOpts := make([]otelhttp.Option, 0, len(opts))
for _, opt := range opts {
otelOpts = append(otelOpts, opt.(otelhttp.Option))
}
return otelOpts
}
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -13,6 +13,7 @@ require (
github.com/fsnotify/fsnotify v1.5.1
github.com/ghodss/yaml v1.0.0
github.com/go-ini/ini v1.66.2
github.com/go-logr/logr v1.2.1
github.com/gobwas/glob v0.2.3
github.com/golang/glog v1.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
71 changes: 58 additions & 13 deletions internal/distributedtracing/distributedtracing.go
@@ -1,3 +1,7 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package distributedtracing

import (
Expand All @@ -7,9 +11,7 @@ import (
"fmt"
"io/ioutil"

"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/util"
"github.com/go-logr/logr"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -19,6 +21,16 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"google.golang.org/grpc/credentials"

"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"

// The import registers opentelemetry with the top-level `tracing` package,
// so the latter can be used from rego/topdown without an explicit build-time
// dependency.
_ "github.com/open-policy-agent/opa/features/tracing"
)

const (
Expand Down Expand Up @@ -55,9 +67,7 @@ type distributedTracingConfig struct {
TLSCACertFile string `json:"tls_ca_cert_file,omitempty"`
}

type Options []otelhttp.Option

func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.Exporter, options Options, err error) {
func Init(ctx context.Context, raw []byte, id string) (*otlptrace.Exporter, tracing.Options, error) {
parsedConfig, err := config.ParseConfig(raw, id)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -87,8 +97,10 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.
return nil, nil, err
}

traceExporter = otlptracegrpc.NewUnstarted(otlptracegrpc.WithEndpoint(distributedTracingConfig.Address),
tlsOption)
traceExporter := otlptracegrpc.NewUnstarted(
otlptracegrpc.WithEndpoint(distributedTracingConfig.Address),
tlsOption,
)

res, err := resource.New(ctx,
resource.WithAttributes(
Expand All @@ -105,18 +117,17 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.
trace.WithSpanProcessor(trace.NewBatchSpanProcessor(traceExporter)),
)

options = append(options,
options := tracing.NewOptions(
otelhttp.WithTracerProvider(traceProvider),
otelhttp.WithPropagators(propagation.TraceContext{}),
)

return traceExporter, options, nil
}

func SetErrorHandler(logger logging.Logger) {
otel.SetErrorHandler(&errorHandler{
logger: logger,
})
func SetupLogging(logger logging.Logger) {
otel.SetErrorHandler(&errorHandler{logger: logger})
otel.SetLogger(logr.New(&sink{logger: logger}))
}

func parseDistributedTracingConfig(raw []byte) (*distributedTracingConfig, error) {
Expand Down Expand Up @@ -241,3 +252,37 @@ type errorHandler struct {
func (e *errorHandler) Handle(err error) {
e.logger.Warn("Distributed tracing: " + err.Error())
}

// NOTE(sr): This adapter code is used to ensure that whatever otel logs, now or
// in the future, will end up in "our" logs, and not go through whatever defaults
// it has set up with its global logger. As such, it's to a full-featured
// implementation fo the logr.LogSink interface, but a rather minimal one. Notably,
// fields are no supported, the initial runtime time info is ignored, and there is
// no support for different verbosity level is "info" logs: they're all printed
// as-is.

type sink struct {
logger logging.Logger
}

func (s *sink) Enabled(level int) bool {
return int(s.logger.GetLevel()) >= level
}

func (*sink) Init(logr.RuntimeInfo) {} // ignored

func (s *sink) Info(_ int, msg string, _ ...interface{}) {
s.logger.Info(msg)
}

func (s *sink) Error(err error, msg string, _ ...interface{}) {
s.logger.WithFields(map[string]interface{}{"err": err}).Error(msg)
}

func (s *sink) WithName(name string) logr.LogSink {
return &sink{s.logger.WithFields(map[string]interface{}{"name": name})}
}

func (s *sink) WithValues(...interface{}) logr.LogSink { // ignored
return s
}
6 changes: 3 additions & 3 deletions rego/rego.go
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/open-policy-agent/opa/bundle"
bundleUtils "github.com/open-policy-agent/opa/internal/bundle"
"github.com/open-policy-agent/opa/internal/compiler/wasm"
"github.com/open-policy-agent/opa/internal/distributedtracing"
"github.com/open-policy-agent/opa/internal/future"
"github.com/open-policy-agent/opa/internal/ir"
"github.com/open-policy-agent/opa/internal/planner"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/open-policy-agent/opa/topdown"
"github.com/open-policy-agent/opa/topdown/cache"
"github.com/open-policy-agent/opa/topdown/print"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/types"
"github.com/open-policy-agent/opa/util"
)
Expand Down Expand Up @@ -514,7 +514,7 @@ type Rego struct {
generateJSON func(*ast.Term, *EvalContext) (interface{}, error)
printHook print.Hook
enablePrintStatements bool
distributedTacingOpts distributedtracing.Options
distributedTacingOpts tracing.Options
}

// Function represents a built-in function that is callable in Rego.
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func PrintHook(h print.Hook) func(r *Rego) {
}

// DistributedTracingOpts sets the options to be used by distributed tracing.
func DistributedTracingOpts(tr distributedtracing.Options) func(r *Rego) {
func DistributedTracingOpts(tr tracing.Options) func(r *Rego) {
return func(r *Rego) {
r.distributedTacingOpts = tr
}
Expand Down
37 changes: 19 additions & 18 deletions runtime/runtime.go
Expand Up @@ -22,14 +22,13 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.uber.org/automaxprocs/maxprocs"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/config"
"github.com/open-policy-agent/opa/internal/distributedtracing"
internal_tracing "github.com/open-policy-agent/opa/internal/distributedtracing"
internal_logging "github.com/open-policy-agent/opa/internal/logging"
"github.com/open-policy-agent/opa/internal/prometheus"
"github.com/open-policy-agent/opa/internal/report"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
)
Expand Down Expand Up @@ -202,7 +202,7 @@ type Params struct {
// If it is nil, a new mux.Router will be created
Router *mux.Router

DistrbutedTracingOpts distributedtracing.Options
DistributedTracingOpts tracing.Options
}

// LoggingConfig stores the configuration for OPA's logging behaviour.
Expand Down Expand Up @@ -277,21 +277,21 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {

config, err := config.Load(params.ConfigFile, params.ConfigOverrides, params.ConfigOverrideFiles)
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

var reporter *report.Reporter
if params.EnableVersionCheck {
var err error
reporter, err = report.New(params.ID, report.Options{Logger: logger})
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}
}

loaded, err := initload.LoadPaths(params.Paths, params.Filter, params.BundleMode, params.BundleVerificationConfig, params.SkipBundleVerification)
if err != nil {
return nil, errors.Wrap(err, "load error")
return nil, fmt.Errorf("load error: %w", err)
}

info, err := runtime.Term(runtime.Params{Config: config})
Expand Down Expand Up @@ -327,26 +327,26 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {
plugins.PrintHook(loggingPrintHook{logger: logger}),
plugins.WithRouter(params.Router))
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

if err := manager.Init(ctx); err != nil {
return nil, errors.Wrap(err, "initialization error")
return nil, fmt.Errorf("initialization error: %w", err)
}

metrics := prometheus.New(metrics.New(), errorLogger(logger))

traceExporter, distrbutedTracingOpts, err := distributedtracing.Init(ctx, config, params.ID)
traceExporter, distributedTracingOpts, err := internal_tracing.Init(ctx, config, params.ID)
if err != nil {
return nil, fmt.Errorf("config error: %w", err)
}
if distrbutedTracingOpts != nil {
params.DistrbutedTracingOpts = distrbutedTracingOpts
if distributedTracingOpts != nil {
params.DistributedTracingOpts = distributedTracingOpts
}

disco, err := discovery.New(manager, discovery.Factories(registeredPlugins), discovery.Metrics(metrics))
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

manager.Register("discovery", disco)
Expand Down Expand Up @@ -418,14 +418,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {

if rt.traceExporter != nil {
if err := rt.traceExporter.Start(ctx); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start trace exporter.")
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start OpenTelemetry trace exporter.")
return err
}

defer func() {
err := rt.traceExporter.Shutdown(ctx)
if err != nil {
rt.logger.Error("Failed to shutdown OpenTelemetry trace exporter gracefully.")
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.")
}
}()
}
Expand All @@ -448,7 +448,7 @@ func (rt *Runtime) Serve(ctx context.Context) error {
WithRuntime(rt.Manager.Info).
WithMetrics(rt.metrics).
WithMinTLSVersion(rt.Params.MinTLSVersion).
WithDistributedTracingOpts(rt.Params.DistrbutedTracingOpts)
WithDistributedTracingOpts(rt.Params.DistributedTracingOpts)

if rt.Params.DiagnosticAddrs != nil {
rt.server = rt.server.WithDiagnosticAddresses(*rt.Params.DiagnosticAddrs)
Expand Down Expand Up @@ -586,9 +586,10 @@ func (rt *Runtime) StartREPL(ctx context.Context) {
repl.Loop(ctx)
}

// SetDistributedTracingErrorHandler configures the distributed tracing's ErrorHandler.
func (rt *Runtime) SetDistributedTracingErrorHandler() {
distributedtracing.SetErrorHandler(rt.logger)
// SetDistributedTracingLogging configures the distributed tracing's ErrorHandler,
// and logger instances.
func (rt *Runtime) SetDistributedTracingLogging() {
internal_tracing.SetupLogging(rt.logger)
}

func (rt *Runtime) checkOPAUpdate(ctx context.Context) *report.DataResponse {
Expand Down
1 change: 1 addition & 0 deletions server/features.go
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

//go:build opa_wasm
// +build opa_wasm

package server
Expand Down

0 comments on commit de9a48f

Please sign in to comment.