Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: make otel dependency optional for rego+topdown #4127

Merged
merged 5 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

//go:build opa_wasm
// +build opa_wasm

package cmd
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func initRuntime(ctx context.Context, params runCmdParams, args []string) (*runt
return nil, err
}

rt.SetDistributedTracingErrorHandler()
rt.SetDistributedTracingLogging()

return rt, nil
}
Expand Down
34 changes: 34 additions & 0 deletions features/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package tracing

import (
"net/http"

pkg_tracing "github.com/open-policy-agent/opa/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func init() {
pkg_tracing.RegisterHTTPTracing(&factory{})
}

type factory struct{}

func (*factory) NewTransport(tr http.RoundTripper, opts pkg_tracing.Options) http.RoundTripper {
return otelhttp.NewTransport(tr, convertOpts(opts)...)
}

func (*factory) NewHandler(f http.Handler, label string, opts pkg_tracing.Options) http.Handler {
return otelhttp.NewHandler(f, label, convertOpts(opts)...)
}

func convertOpts(opts pkg_tracing.Options) []otelhttp.Option {
otelOpts := make([]otelhttp.Option, 0, len(opts))
for _, opt := range opts {
otelOpts = append(otelOpts, opt.(otelhttp.Option))
}
return otelOpts
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/fsnotify/fsnotify v1.5.1
github.com/ghodss/yaml v1.0.0
github.com/go-ini/ini v1.66.2
github.com/go-logr/logr v1.2.1
github.com/gobwas/glob v0.2.3
github.com/golang/glog v1.0.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
71 changes: 58 additions & 13 deletions internal/distributedtracing/distributedtracing.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package distributedtracing

import (
Expand All @@ -7,9 +11,7 @@ import (
"fmt"
"io/ioutil"

"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/util"
"github.com/go-logr/logr"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -19,6 +21,16 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"google.golang.org/grpc/credentials"

"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"

// The import registers opentelemetry with the top-level `tracing` package,
// so the latter can be used from rego/topdown without an explicit build-time
// dependency.
_ "github.com/open-policy-agent/opa/features/tracing"
)

const (
Expand Down Expand Up @@ -55,9 +67,7 @@ type distributedTracingConfig struct {
TLSCACertFile string `json:"tls_ca_cert_file,omitempty"`
}

type Options []otelhttp.Option

func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.Exporter, options Options, err error) {
func Init(ctx context.Context, raw []byte, id string) (*otlptrace.Exporter, tracing.Options, error) {
parsedConfig, err := config.ParseConfig(raw, id)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -87,8 +97,10 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.
return nil, nil, err
}

traceExporter = otlptracegrpc.NewUnstarted(otlptracegrpc.WithEndpoint(distributedTracingConfig.Address),
tlsOption)
traceExporter := otlptracegrpc.NewUnstarted(
otlptracegrpc.WithEndpoint(distributedTracingConfig.Address),
tlsOption,
)

res, err := resource.New(ctx,
resource.WithAttributes(
Expand All @@ -105,18 +117,17 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.
trace.WithSpanProcessor(trace.NewBatchSpanProcessor(traceExporter)),
)

options = append(options,
options := tracing.NewOptions(
otelhttp.WithTracerProvider(traceProvider),
otelhttp.WithPropagators(propagation.TraceContext{}),
)

return traceExporter, options, nil
}

func SetErrorHandler(logger logging.Logger) {
otel.SetErrorHandler(&errorHandler{
logger: logger,
})
func SetupLogging(logger logging.Logger) {
otel.SetErrorHandler(&errorHandler{logger: logger})
otel.SetLogger(logr.New(&sink{logger: logger}))
}

func parseDistributedTracingConfig(raw []byte) (*distributedTracingConfig, error) {
Expand Down Expand Up @@ -241,3 +252,37 @@ type errorHandler struct {
func (e *errorHandler) Handle(err error) {
e.logger.Warn("Distributed tracing: " + err.Error())
}

// NOTE(sr): This adapter code is used to ensure that whatever otel logs, now or
// in the future, will end up in "our" logs, and not go through whatever defaults
// it has set up with its global logger. As such, it's to a full-featured
// implementation fo the logr.LogSink interface, but a rather minimal one. Notably,
// fields are no supported, the initial runtime time info is ignored, and there is
// no support for different verbosity level is "info" logs: they're all printed
// as-is.

type sink struct {
logger logging.Logger
}

func (s *sink) Enabled(level int) bool {
return int(s.logger.GetLevel()) >= level
}

func (*sink) Init(logr.RuntimeInfo) {} // ignored

func (s *sink) Info(_ int, msg string, _ ...interface{}) {
s.logger.Info(msg)
}

func (s *sink) Error(err error, msg string, _ ...interface{}) {
s.logger.WithFields(map[string]interface{}{"err": err}).Error(msg)
}

func (s *sink) WithName(name string) logr.LogSink {
return &sink{s.logger.WithFields(map[string]interface{}{"name": name})}
}

func (s *sink) WithValues(...interface{}) logr.LogSink { // ignored
return s
}
6 changes: 3 additions & 3 deletions rego/rego.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/open-policy-agent/opa/bundle"
bundleUtils "github.com/open-policy-agent/opa/internal/bundle"
"github.com/open-policy-agent/opa/internal/compiler/wasm"
"github.com/open-policy-agent/opa/internal/distributedtracing"
"github.com/open-policy-agent/opa/internal/future"
"github.com/open-policy-agent/opa/internal/ir"
"github.com/open-policy-agent/opa/internal/planner"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/open-policy-agent/opa/topdown"
"github.com/open-policy-agent/opa/topdown/cache"
"github.com/open-policy-agent/opa/topdown/print"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/types"
"github.com/open-policy-agent/opa/util"
)
Expand Down Expand Up @@ -514,7 +514,7 @@ type Rego struct {
generateJSON func(*ast.Term, *EvalContext) (interface{}, error)
printHook print.Hook
enablePrintStatements bool
distributedTacingOpts distributedtracing.Options
distributedTacingOpts tracing.Options
}

// Function represents a built-in function that is callable in Rego.
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func PrintHook(h print.Hook) func(r *Rego) {
}

// DistributedTracingOpts sets the options to be used by distributed tracing.
func DistributedTracingOpts(tr distributedtracing.Options) func(r *Rego) {
func DistributedTracingOpts(tr tracing.Options) func(r *Rego) {
return func(r *Rego) {
r.distributedTacingOpts = tr
}
Expand Down
37 changes: 19 additions & 18 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.uber.org/automaxprocs/maxprocs"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/config"
"github.com/open-policy-agent/opa/internal/distributedtracing"
internal_tracing "github.com/open-policy-agent/opa/internal/distributedtracing"
internal_logging "github.com/open-policy-agent/opa/internal/logging"
"github.com/open-policy-agent/opa/internal/prometheus"
"github.com/open-policy-agent/opa/internal/report"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
)
Expand Down Expand Up @@ -202,7 +202,7 @@ type Params struct {
// If it is nil, a new mux.Router will be created
Router *mux.Router

DistrbutedTracingOpts distributedtracing.Options
DistributedTracingOpts tracing.Options
}

// LoggingConfig stores the configuration for OPA's logging behaviour.
Expand Down Expand Up @@ -277,21 +277,21 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {

config, err := config.Load(params.ConfigFile, params.ConfigOverrides, params.ConfigOverrideFiles)
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

var reporter *report.Reporter
if params.EnableVersionCheck {
var err error
reporter, err = report.New(params.ID, report.Options{Logger: logger})
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}
}

loaded, err := initload.LoadPaths(params.Paths, params.Filter, params.BundleMode, params.BundleVerificationConfig, params.SkipBundleVerification)
if err != nil {
return nil, errors.Wrap(err, "load error")
return nil, fmt.Errorf("load error: %w", err)
}

info, err := runtime.Term(runtime.Params{Config: config})
Expand Down Expand Up @@ -327,26 +327,26 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {
plugins.PrintHook(loggingPrintHook{logger: logger}),
plugins.WithRouter(params.Router))
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

if err := manager.Init(ctx); err != nil {
return nil, errors.Wrap(err, "initialization error")
return nil, fmt.Errorf("initialization error: %w", err)
}

metrics := prometheus.New(metrics.New(), errorLogger(logger))

traceExporter, distrbutedTracingOpts, err := distributedtracing.Init(ctx, config, params.ID)
traceExporter, distributedTracingOpts, err := internal_tracing.Init(ctx, config, params.ID)
if err != nil {
return nil, fmt.Errorf("config error: %w", err)
}
if distrbutedTracingOpts != nil {
params.DistrbutedTracingOpts = distrbutedTracingOpts
if distributedTracingOpts != nil {
params.DistributedTracingOpts = distributedTracingOpts
}

disco, err := discovery.New(manager, discovery.Factories(registeredPlugins), discovery.Metrics(metrics))
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

manager.Register("discovery", disco)
Expand Down Expand Up @@ -418,14 +418,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {

if rt.traceExporter != nil {
if err := rt.traceExporter.Start(ctx); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start trace exporter.")
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start OpenTelemetry trace exporter.")
return err
}

defer func() {
err := rt.traceExporter.Shutdown(ctx)
if err != nil {
rt.logger.Error("Failed to shutdown OpenTelemetry trace exporter gracefully.")
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.")
}
}()
}
Expand All @@ -448,7 +448,7 @@ func (rt *Runtime) Serve(ctx context.Context) error {
WithRuntime(rt.Manager.Info).
WithMetrics(rt.metrics).
WithMinTLSVersion(rt.Params.MinTLSVersion).
WithDistributedTracingOpts(rt.Params.DistrbutedTracingOpts)
WithDistributedTracingOpts(rt.Params.DistributedTracingOpts)

if rt.Params.DiagnosticAddrs != nil {
rt.server = rt.server.WithDiagnosticAddresses(*rt.Params.DiagnosticAddrs)
Expand Down Expand Up @@ -586,9 +586,10 @@ func (rt *Runtime) StartREPL(ctx context.Context) {
repl.Loop(ctx)
}

// SetDistributedTracingErrorHandler configures the distributed tracing's ErrorHandler.
func (rt *Runtime) SetDistributedTracingErrorHandler() {
distributedtracing.SetErrorHandler(rt.logger)
// SetDistributedTracingLogging configures the distributed tracing's ErrorHandler,
// and logger instances.
func (rt *Runtime) SetDistributedTracingLogging() {
internal_tracing.SetupLogging(rt.logger)
}

func (rt *Runtime) checkOPAUpdate(ctx context.Context) *report.DataResponse {
Expand Down
1 change: 1 addition & 0 deletions server/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

//go:build opa_wasm
// +build opa_wasm

package server
Expand Down