Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: make otel dependency optional for rego+topdown #4127

Merged
merged 5 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/features.go
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

//go:build opa_wasm
// +build opa_wasm

package cmd
Expand Down
34 changes: 34 additions & 0 deletions features/tracing/tracing.go
@@ -0,0 +1,34 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package tracing

import (
"net/http"

pkg_tracing "github.com/open-policy-agent/opa/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func init() {
pkg_tracing.RegisterHTTPTracing(&factory{})
}

type factory struct{}

func (*factory) NewTransport(tr http.RoundTripper, opts pkg_tracing.Options) http.RoundTripper {
return otelhttp.NewTransport(tr, convertOpts(opts)...)
}

func (*factory) NewHandler(f http.Handler, label string, opts pkg_tracing.Options) http.Handler {
return otelhttp.NewHandler(f, label, convertOpts(opts)...)
}

func convertOpts(opts pkg_tracing.Options) []otelhttp.Option {
otelOpts := make([]otelhttp.Option, 0, len(opts))
for _, opt := range opts {
otelOpts = append(otelOpts, opt.(otelhttp.Option))
}
return otelOpts
}
29 changes: 20 additions & 9 deletions internal/distributedtracing/distributedtracing.go
@@ -1,3 +1,7 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package distributedtracing

import (
Expand All @@ -7,9 +11,6 @@ import (
"fmt"
"io/ioutil"

"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/util"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand All @@ -19,6 +20,16 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"google.golang.org/grpc/credentials"

"github.com/open-policy-agent/opa/config"
"github.com/open-policy-agent/opa/logging"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"

// The import registers opentelemetry with the top-level `tracing` package,
// so the latter can be used from rego/topdown without an explicit build-time
// dependency.
_ "github.com/open-policy-agent/opa/features/tracing"
)

const (
Expand Down Expand Up @@ -55,9 +66,7 @@ type distributedTracingConfig struct {
TLSCACertFile string `json:"tls_ca_cert_file,omitempty"`
}

type Options []otelhttp.Option

func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.Exporter, options Options, err error) {
func Init(ctx context.Context, raw []byte, id string) (*otlptrace.Exporter, tracing.Options, error) {
parsedConfig, err := config.ParseConfig(raw, id)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -87,8 +96,10 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.
return nil, nil, err
}

traceExporter = otlptracegrpc.NewUnstarted(otlptracegrpc.WithEndpoint(distributedTracingConfig.Address),
tlsOption)
traceExporter := otlptracegrpc.NewUnstarted(
otlptracegrpc.WithEndpoint(distributedTracingConfig.Address),
tlsOption,
)

res, err := resource.New(ctx,
resource.WithAttributes(
Expand All @@ -105,7 +116,7 @@ func Init(ctx context.Context, raw []byte, id string) (traceExporter *otlptrace.
trace.WithSpanProcessor(trace.NewBatchSpanProcessor(traceExporter)),
)

options = append(options,
options := tracing.NewOptions(
otelhttp.WithTracerProvider(traceProvider),
otelhttp.WithPropagators(propagation.TraceContext{}),
)
Expand Down
6 changes: 3 additions & 3 deletions rego/rego.go
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/open-policy-agent/opa/bundle"
bundleUtils "github.com/open-policy-agent/opa/internal/bundle"
"github.com/open-policy-agent/opa/internal/compiler/wasm"
"github.com/open-policy-agent/opa/internal/distributedtracing"
"github.com/open-policy-agent/opa/internal/future"
"github.com/open-policy-agent/opa/internal/ir"
"github.com/open-policy-agent/opa/internal/planner"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/open-policy-agent/opa/topdown"
"github.com/open-policy-agent/opa/topdown/cache"
"github.com/open-policy-agent/opa/topdown/print"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/types"
"github.com/open-policy-agent/opa/util"
)
Expand Down Expand Up @@ -514,7 +514,7 @@ type Rego struct {
generateJSON func(*ast.Term, *EvalContext) (interface{}, error)
printHook print.Hook
enablePrintStatements bool
distributedTacingOpts distributedtracing.Options
distributedTacingOpts tracing.Options
}

// Function represents a built-in function that is callable in Rego.
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func PrintHook(h print.Hook) func(r *Rego) {
}

// DistributedTracingOpts sets the options to be used by distributed tracing.
func DistributedTracingOpts(tr distributedtracing.Options) func(r *Rego) {
func DistributedTracingOpts(tr tracing.Options) func(r *Rego) {
return func(r *Rego) {
r.distributedTacingOpts = tr
}
Expand Down
32 changes: 16 additions & 16 deletions runtime/runtime.go
Expand Up @@ -22,14 +22,13 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.uber.org/automaxprocs/maxprocs"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/config"
"github.com/open-policy-agent/opa/internal/distributedtracing"
internal_tracing "github.com/open-policy-agent/opa/internal/distributedtracing"
internal_logging "github.com/open-policy-agent/opa/internal/logging"
"github.com/open-policy-agent/opa/internal/prometheus"
"github.com/open-policy-agent/opa/internal/report"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/open-policy-agent/opa/server"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
)
Expand Down Expand Up @@ -202,7 +202,7 @@ type Params struct {
// If it is nil, a new mux.Router will be created
Router *mux.Router

DistrbutedTracingOpts distributedtracing.Options
DistributedTracingOpts tracing.Options
}

// LoggingConfig stores the configuration for OPA's logging behaviour.
Expand Down Expand Up @@ -277,21 +277,21 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {

config, err := config.Load(params.ConfigFile, params.ConfigOverrides, params.ConfigOverrideFiles)
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

var reporter *report.Reporter
if params.EnableVersionCheck {
var err error
reporter, err = report.New(params.ID, report.Options{Logger: logger})
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}
}

loaded, err := initload.LoadPaths(params.Paths, params.Filter, params.BundleMode, params.BundleVerificationConfig, params.SkipBundleVerification)
if err != nil {
return nil, errors.Wrap(err, "load error")
return nil, fmt.Errorf("load error: %w", err)
}

info, err := runtime.Term(runtime.Params{Config: config})
Expand Down Expand Up @@ -327,26 +327,26 @@ func NewRuntime(ctx context.Context, params Params) (*Runtime, error) {
plugins.PrintHook(loggingPrintHook{logger: logger}),
plugins.WithRouter(params.Router))
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

if err := manager.Init(ctx); err != nil {
return nil, errors.Wrap(err, "initialization error")
return nil, fmt.Errorf("initialization error: %w", err)
}

metrics := prometheus.New(metrics.New(), errorLogger(logger))

traceExporter, distrbutedTracingOpts, err := distributedtracing.Init(ctx, config, params.ID)
traceExporter, distributedTracingOpts, err := internal_tracing.Init(ctx, config, params.ID)
if err != nil {
return nil, fmt.Errorf("config error: %w", err)
}
if distrbutedTracingOpts != nil {
params.DistrbutedTracingOpts = distrbutedTracingOpts
if distributedTracingOpts != nil {
params.DistributedTracingOpts = distributedTracingOpts
}

disco, err := discovery.New(manager, discovery.Factories(registeredPlugins), discovery.Metrics(metrics))
if err != nil {
return nil, errors.Wrap(err, "config error")
return nil, fmt.Errorf("config error: %w", err)
}

manager.Register("discovery", disco)
Expand Down Expand Up @@ -418,14 +418,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {

if rt.traceExporter != nil {
if err := rt.traceExporter.Start(ctx); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start trace exporter.")
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start OpenTelemetry trace exporter.")
return err
}

defer func() {
err := rt.traceExporter.Shutdown(ctx)
if err != nil {
rt.logger.Error("Failed to shutdown OpenTelemetry trace exporter gracefully.")
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.")
}
}()
}
Expand All @@ -448,7 +448,7 @@ func (rt *Runtime) Serve(ctx context.Context) error {
WithRuntime(rt.Manager.Info).
WithMetrics(rt.metrics).
WithMinTLSVersion(rt.Params.MinTLSVersion).
WithDistributedTracingOpts(rt.Params.DistrbutedTracingOpts)
WithDistributedTracingOpts(rt.Params.DistributedTracingOpts)

if rt.Params.DiagnosticAddrs != nil {
rt.server = rt.server.WithDiagnosticAddresses(*rt.Params.DiagnosticAddrs)
Expand Down Expand Up @@ -588,7 +588,7 @@ func (rt *Runtime) StartREPL(ctx context.Context) {

// SetDistributedTracingErrorHandler configures the distributed tracing's ErrorHandler.
func (rt *Runtime) SetDistributedTracingErrorHandler() {
distributedtracing.SetErrorHandler(rt.logger)
internal_tracing.SetErrorHandler(rt.logger)
}

func (rt *Runtime) checkOPAUpdate(ctx context.Context) *report.DataResponse {
Expand Down
1 change: 1 addition & 0 deletions server/features.go
Expand Up @@ -2,6 +2,7 @@
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

//go:build opa_wasm
// +build opa_wasm

package server
Expand Down
12 changes: 5 additions & 7 deletions server/server.go
Expand Up @@ -26,15 +26,12 @@ import (

"github.com/gorilla/mux"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/bundle"
"github.com/open-policy-agent/opa/internal/distributedtracing"

"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/plugins"
bundlePlugin "github.com/open-policy-agent/opa/plugins/bundle"
Expand All @@ -48,6 +45,7 @@ import (
"github.com/open-policy-agent/opa/topdown"
iCache "github.com/open-policy-agent/opa/topdown/cache"
"github.com/open-policy-agent/opa/topdown/lineage"
"github.com/open-policy-agent/opa/tracing"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
)
Expand Down Expand Up @@ -131,7 +129,7 @@ type Server struct {
defaultDecisionPath string
interQueryBuiltinCache iCache.InterQueryCache
allPluginsOkOnce bool
distributedTracingOpts distributedtracing.Options
distributedTracingOpts tracing.Options
}

// Metrics defines the interface that the server requires for recording HTTP
Expand Down Expand Up @@ -338,8 +336,8 @@ func (s *Server) WithMinTLSVersion(minTLSVersion uint16) *Server {
}

// WithDistributedTracingOpts sets the options to be used by distributed tracing.
func (s *Server) WithDistributedTracingOpts(traceOpts distributedtracing.Options) *Server {
s.distributedTracingOpts = traceOpts
func (s *Server) WithDistributedTracingOpts(opts tracing.Options) *Server {
s.distributedTracingOpts = opts
return s
}

Expand Down Expand Up @@ -736,7 +734,7 @@ func (s *Server) initRouters() {
func (s *Server) instrumentHandler(handler func(http.ResponseWriter, *http.Request), label string) http.Handler {
var httpHandler http.Handler = http.HandlerFunc(handler)
if len(s.distributedTracingOpts) > 0 {
httpHandler = otelhttp.NewHandler(http.HandlerFunc(handler), label, s.distributedTracingOpts...)
httpHandler = tracing.NewHandler(httpHandler, label, s.distributedTracingOpts)
}
if s.metrics != nil {
return s.metrics.InstrumentHandler(httpHandler, label)
Expand Down
12 changes: 8 additions & 4 deletions test/e2e/distributedtracing/distributedtracing_test.go
@@ -1,3 +1,7 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package distributedtracing

import (
Expand All @@ -8,8 +12,8 @@ import (
"strings"
"testing"

"github.com/open-policy-agent/opa/internal/distributedtracing"
"github.com/open-policy-agent/opa/test/e2e"
"github.com/open-policy-agent/opa/tracing"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -21,13 +25,13 @@ var spanExporter *tracetest.InMemoryExporter

func TestMain(m *testing.M) {
spanExporter = tracetest.NewInMemoryExporter()
options := distributedtracing.Options{
options := tracing.NewOptions(
otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExporter)))),
}
)

flag.Parse()
testServerParams := e2e.NewAPIServerTestParams()
testServerParams.DistrbutedTracingOpts = options
testServerParams.DistributedTracingOpts = options

var err error
testRuntime, err = e2e.NewTestRuntime(testServerParams)
Expand Down