Skip to content

Commit

Permalink
Restore baggage support in HotROD πŸš— (#4225)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- The baggage propagation was broken in HotROD after #4187
- This restores the baggage support by providing a few integration
points with OTEL
- [ ] This change will not work until OTEL Baggage & Bridge bugs are
fixed: open-telemetry/opentelemetry-go#3685

## Short description of the changes
- [`index.html`] The webapp used to send baggage via `jaeger-baggage`
header, this is now changed to W3C `baggage` header
- [`mux.go`] The Jaeger SDK was able to accept `jaeger-baggage` header
even for requests without am active trace. OTEL Bridge does not support
that, so we use OTEL's Baggage propagator to manually extract the
baggage into Context, and once the Bridge creates a Span, we copy OTEL
baggage from Context into the Span.
- All other changes are cosmetic / logging related

---------

Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro committed Feb 9, 2023
1 parent 7ee4c10 commit fedeb4c
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 39 deletions.
1 change: 0 additions & 1 deletion examples/hotrod/cmd/customer.go
Expand Up @@ -46,5 +46,4 @@ var customerCmd = &cobra.Command{

func init() {
RootCmd.AddCommand(customerCmd)

}
1 change: 0 additions & 1 deletion examples/hotrod/cmd/driver.go
Expand Up @@ -46,5 +46,4 @@ var driverCmd = &cobra.Command{

func init() {
RootCmd.AddCommand(driverCmd)

}
1 change: 0 additions & 1 deletion examples/hotrod/cmd/frontend.go
Expand Up @@ -56,5 +56,4 @@ var options frontend.ConfigOptions

func init() {
RootCmd.AddCommand(frontendCmd)

}
24 changes: 18 additions & 6 deletions examples/hotrod/cmd/root.go
Expand Up @@ -36,6 +36,7 @@ var (
logger *zap.Logger
metricsFactory metrics.Factory
otelExporter string // jaeger, otlp, stdout
verbose bool

fixDBConnDelay time.Duration
fixDBConnDisableMutex bool
Expand Down Expand Up @@ -84,17 +85,28 @@ func init() {
RootCmd.PersistentFlags().StringVarP(&basepath, "basepath", "b", "", `Basepath for frontend service(default "/")`)
RootCmd.PersistentFlags().StringVarP(&jaegerUI, "jaeger-ui", "j", "http://localhost:16686", "Address of Jaeger UI to create [find trace] links")

rand.Seed(int64(time.Now().Nanosecond()))
logger, _ = zap.NewDevelopment(
zap.AddStacktrace(zapcore.FatalLevel),
zap.AddCallerSkip(1),
)
jaegerclientenv2otel.MapJaegerToOtelEnvVars(logger)
RootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enables debug logging")

cobra.OnInitialize(onInitialize)
}

// onInitialize is called before the command is executed.
func onInitialize() {
rand.Seed(int64(time.Now().Nanosecond()))

zapOptions := []zap.Option{
zap.AddStacktrace(zapcore.FatalLevel),
zap.AddCallerSkip(1),
}
if !verbose {
zapOptions = append(zapOptions,
zap.IncreaseLevel(zap.LevelEnablerFunc(func(l zapcore.Level) bool { return l != zapcore.DebugLevel })),
)
}
logger, _ = zap.NewDevelopment(zapOptions...)

jaegerclientenv2otel.MapJaegerToOtelEnvVars(logger)

switch metricsBackend {
case "expvar":
metricsFactory = expvar.NewFactory(10) // 10 buckets for histograms
Expand Down
1 change: 0 additions & 1 deletion examples/hotrod/cmd/route.go
Expand Up @@ -46,5 +46,4 @@ var routeCmd = &cobra.Command{

func init() {
RootCmd.AddCommand(routeCmd)

}
2 changes: 1 addition & 1 deletion examples/hotrod/pkg/log/factory.go
Expand Up @@ -37,7 +37,7 @@ func NewFactory(logger *zap.Logger) Factory {

// Bg creates a context-unaware logger.
func (b Factory) Bg() Logger {
return logger(b)
return wrapper(b)
}

// For returns a context-aware Logger. If the context
Expand Down
20 changes: 13 additions & 7 deletions examples/hotrod/pkg/log/logger.go
Expand Up @@ -22,33 +22,39 @@ import (

// Logger is a simplified abstraction of the zap.Logger
type Logger interface {
Debug(msg string, fields ...zapcore.Field)
Info(msg string, fields ...zapcore.Field)
Error(msg string, fields ...zapcore.Field)
Fatal(msg string, fields ...zapcore.Field)
With(fields ...zapcore.Field) Logger
}

// logger delegates all calls to the underlying zap.Logger
type logger struct {
// wrapper delegates all calls to the underlying zap.Logger
type wrapper struct {
logger *zap.Logger
}

// Debug logs an debug msg with fields
func (l wrapper) Debug(msg string, fields ...zapcore.Field) {
l.logger.Debug(msg, fields...)
}

// Info logs an info msg with fields
func (l logger) Info(msg string, fields ...zapcore.Field) {
func (l wrapper) Info(msg string, fields ...zapcore.Field) {
l.logger.Info(msg, fields...)
}

// Error logs an error msg with fields
func (l logger) Error(msg string, fields ...zapcore.Field) {
func (l wrapper) Error(msg string, fields ...zapcore.Field) {
l.logger.Error(msg, fields...)
}

// Fatal logs a fatal error msg with fields
func (l logger) Fatal(msg string, fields ...zapcore.Field) {
func (l wrapper) Fatal(msg string, fields ...zapcore.Field) {
l.logger.Fatal(msg, fields...)
}

// With creates a child logger, and optionally adds some context fields to that logger.
func (l logger) With(fields ...zapcore.Field) Logger {
return logger{logger: l.logger.With(fields...)}
func (l wrapper) With(fields ...zapcore.Field) Logger {
return wrapper{logger: l.logger.With(fields...)}
}
5 changes: 5 additions & 0 deletions examples/hotrod/pkg/log/spanlogger.go
Expand Up @@ -31,6 +31,11 @@ type spanLogger struct {
spanFields []zapcore.Field
}

func (sl spanLogger) Debug(msg string, fields ...zapcore.Field) {
sl.logToSpan("Debug", msg, fields...)
sl.logger.Debug(msg, append(sl.spanFields, fields...)...)
}

func (sl spanLogger) Info(msg string, fields ...zapcore.Field) {
sl.logToSpan("info", msg, fields...)
sl.logger.Info(msg, append(sl.spanFields, fields...)...)
Expand Down
10 changes: 7 additions & 3 deletions examples/hotrod/pkg/tracing/init.go
Expand Up @@ -44,14 +44,18 @@ var once sync.Once
// to return an OpenTracing-compatible tracer.
func Init(serviceName string, exporterType string, metricsFactory metrics.Factory, logger log.Factory) opentracing.Tracer {
once.Do(func() {
otel.SetTextMapPropagator(propagation.TraceContext{})
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
})

exp, err := createOtelExporter(exporterType)
if err != nil {
logger.Bg().Fatal("cannot create exporter", zap.String("exporterType", exporterType), zap.Error(err))
}
logger.Bg().Info("using " + exporterType + " trace exporter")
logger.Bg().Debug("using " + exporterType + " trace exporter")

rpcmetricsObserver := rpcmetrics.NewObserver(metricsFactory, rpcmetrics.DefaultNameNormalizer)

Expand All @@ -64,7 +68,7 @@ func Init(serviceName string, exporterType string, metricsFactory metrics.Factor
)),
)
otTracer, _ := otbridge.NewTracerPair(tp.Tracer(""))
logger.Bg().Info("created OTEL->OT brige", zap.String("service-name", serviceName))
logger.Bg().Debug("created OTEL->OT brige", zap.String("service-name", serviceName))
return otTracer
}

Expand Down
56 changes: 47 additions & 9 deletions examples/hotrod/pkg/tracing/mux.go
Expand Up @@ -20,34 +20,72 @@ import (

"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/examples/hotrod/pkg/log"
)

// NewServeMux creates a new TracedServeMux.
func NewServeMux(tracer opentracing.Tracer) *TracedServeMux {
func NewServeMux(copyBaggage bool, tracer opentracing.Tracer, logger log.Factory) *TracedServeMux {
return &TracedServeMux{
mux: http.NewServeMux(),
tracer: tracer,
mux: http.NewServeMux(),
copyBaggage: copyBaggage,
tracer: tracer,
logger: logger,
}
}

// TracedServeMux is a wrapper around http.ServeMux that instruments handlers for tracing.
type TracedServeMux struct {
mux *http.ServeMux
tracer opentracing.Tracer
mux *http.ServeMux
copyBaggage bool
tracer opentracing.Tracer
logger log.Factory
}

// Handle implements http.ServeMux#Handle
// Handle implements http.ServeMux#Handle, which is used to register new handler.
func (tm *TracedServeMux) Handle(pattern string, handler http.Handler) {
tm.logger.Bg().Debug("registering traced handler", zap.String("endpoint", pattern))

middleware := nethttp.Middleware(
tm.tracer,
handler,
nethttp.OperationNameFunc(func(r *http.Request) string {
return "HTTP " + r.Method + " " + pattern
}))
tm.mux.Handle(pattern, middleware)
}),
// Jaeger SDK was able to accept `jaeger-baggage` header even for requests without am active trace.
// OTEL Bridge does not support that, so we use Baggage propagator to manually extract the baggage
// into Context (in otelBaggageExtractor handler below), and once the Bridge creates a Span,
// we use this SpanObserver to copy OTEL baggage from Context into the Span.
nethttp.MWSpanObserver(func(span opentracing.Span, r *http.Request) {
if !tm.copyBaggage {
return
}
bag := baggage.FromContext(r.Context())
for _, m := range bag.Members() {
if b := span.BaggageItem(m.Key()); b == "" {
span.SetBaggageItem(m.Key(), m.Value())
}
}
}),
)
tm.mux.Handle(pattern, otelBaggageExtractor(middleware))
}

// ServeHTTP implements http.ServeMux#ServeHTTP
// ServeHTTP implements http.ServeMux#ServeHTTP.
func (tm *TracedServeMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
tm.mux.ServeHTTP(w, r)
}

// Used with nethttp.MWSpanObserver above.
func otelBaggageExtractor(next http.Handler) http.Handler {
propagator := propagation.Baggage{}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
carrier := propagation.HeaderCarrier(r.Header)
ctx := propagator.Extract(r.Context(), carrier)
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
})
}
1 change: 0 additions & 1 deletion examples/hotrod/services/customer/client.go
Expand Up @@ -54,7 +54,6 @@ func (c *Client) Get(ctx context.Context, customerID string) (*Customer, error)
c.logger.For(ctx).Info("Getting customer", zap.String("customer_id", customerID))

url := fmt.Sprintf("http://"+c.hostPort+"/customer?customer=%s", customerID)
fmt.Println(url)
var customer Customer
if err := c.client.GetJSON(ctx, "/customer", url, &customer); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/services/customer/server.go
Expand Up @@ -57,7 +57,7 @@ func (s *Server) Run() error {
}

func (s *Server) createServeMux() http.Handler {
mux := tracing.NewServeMux(s.tracer)
mux := tracing.NewServeMux(false, s.tracer, s.logger)
mux.Handle("/customer", http.HandlerFunc(s.customer))
return mux
}
Expand Down
8 changes: 4 additions & 4 deletions examples/hotrod/services/driver/server.go
Expand Up @@ -43,10 +43,10 @@ var _ DriverServiceServer = (*Server)(nil)
// NewServer creates a new driver.Server
func NewServer(hostPort string, otelExporter string, metricsFactory metrics.Factory, logger log.Factory) *Server {
tracer := tracing.Init("driver", otelExporter, metricsFactory, logger)
server := grpc.NewServer(grpc.UnaryInterceptor(
otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.StreamInterceptor(
otgrpc.OpenTracingStreamServerInterceptor(tracer)))
server := grpc.NewServer(
grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)),
)
return &Server{
hostPort: hostPort,
tracer: tracer,
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/services/frontend/server.go
Expand Up @@ -76,7 +76,7 @@ func (s *Server) Run() error {
}

func (s *Server) createServeMux() http.Handler {
mux := tracing.NewServeMux(s.tracer)
mux := tracing.NewServeMux(true, s.tracer, s.logger)
p := path.Join("/", s.basepath)
mux.Handle(p, http.StripPrefix(p, http.FileServer(s.assetFS)))
mux.Handle(path.Join(p, "/dispatch"), http.HandlerFunc(s.dispatch))
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/services/frontend/web_assets/index.html
Expand Up @@ -69,7 +69,7 @@ <h4>πŸš— <em>Rides On Demand</em> πŸš—</h4>
var freshCar = $($("#hotrod-log").prepend('<div class="fresh-car"><em>Dispatching a car...[req: '+requestID+']</em></div>').children()[0]);
var customer = evt.target.dataset.customer;
var headers = {
'jaeger-baggage': 'session=' + clientUUID + ', request=' + requestID
'baggage': 'session=' + clientUUID + ', request=' + requestID
};
console.log(headers);
var before = Date.now();
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/services/route/server.go
Expand Up @@ -59,7 +59,7 @@ func (s *Server) Run() error {
}

func (s *Server) createServeMux() http.Handler {
mux := tracing.NewServeMux(s.tracer)
mux := tracing.NewServeMux(false, s.tracer, s.logger)
mux.Handle("/route", http.HandlerFunc(s.route))
mux.Handle("/debug/vars", expvar.Handler()) // expvar
mux.Handle("/metrics", promhttp.Handler()) // Prometheus
Expand Down

0 comments on commit fedeb4c

Please sign in to comment.