Skip to content

Commit

Permalink
Save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed May 17, 2024
1 parent 6106973 commit 714a660
Show file tree
Hide file tree
Showing 8 changed files with 1,429 additions and 306 deletions.
47 changes: 46 additions & 1 deletion stats/opentelemetry/client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package opentelemetry

import (
"context"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata"
"sync/atomic"
"time"

Expand Down Expand Up @@ -63,6 +65,14 @@ func (csh *clientStatsHandler) unaryInterceptor(ctx context.Context, method stri
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil {
md := csh.o.MetricsOptions.pluginOption.GetMetadata()
val := md.Get("x-envoy-peer-metadata")
if len(val) == 1 {
ctx = metadata.AppendToOutgoingContext(ctx, metadataExchangeKey, val[0])
}
}

startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
csh.perCallMetrics(ctx, err, startTime, ci)
Expand Down Expand Up @@ -98,6 +108,15 @@ func (csh *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc
method: csh.determineMethod(method, opts...),
}
ctx = setCallInfo(ctx, ci)

if csh.o.MetricsOptions.pluginOption != nil { // from incoming context right for GetLabels? well e2e test and find out :)
md := csh.o.MetricsOptions.pluginOption.GetMetadata() // metadata.MD
val := md.Get("x-envoy-peer-metadata")
if len(val) == 1 {
ctx = metadata.AppendToOutgoingContext(ctx, metadataExchangeKey, val[0])
}
}

startTime := time.Now()

callback := func(err error) {
Expand All @@ -123,12 +142,17 @@ func (csh *clientStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

// TagRPC implements per RPC attempt context management.
func (csh *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
labels := &istats.Labels{
TelemetryLabels: make(map[string]string),
}
mi := &metricsInfo{ // populates information about RPC start.
startTime: time.Now(),
xDSLabels: labels.TelemetryLabels, // holds a ref to map, make sure this doesn't race (and works, just test it :D)
}
ri := &rpcInfo{
mi: mi,
}
ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap.
return setRPCInfo(ctx, ri)
}

Expand Down Expand Up @@ -157,6 +181,17 @@ func (csh *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCS
atomic.AddInt64(&mi.recvCompressedBytes, int64(st.CompressedLength))
case *stats.End:
csh.processRPCEnd(ctx, mi, st)
case *stats.InHeader:
// Also delete target filtration...maybe same PR?
if !mi.labelsReceived && csh.o.MetricsOptions.pluginOption != nil {
mi.labels = csh.o.MetricsOptions.pluginOption.GetLabels(st.Header, mi.xDSLabels) // way to get xDS Labels set in Tag(), so I'm good here...
mi.labelsReceived = true
}
case *stats.InTrailer: // only variable is header and trailer can pull out into helper
if !mi.labelsReceived && csh.o.MetricsOptions.pluginOption != nil {
mi.labels = csh.o.MetricsOptions.pluginOption.GetLabels(st.Trailer, mi.xDSLabels) // this read should just...work right (since set in picker update (pass around a heap pointer)
mi.labelsReceived = true
}
default:
}
}
Expand All @@ -174,7 +209,17 @@ func (csh *clientStatsHandler) processRPCEnd(ctx context.Context, mi *metricsInf
st = canonicalString(s.Code())
}

clientAttributeOption := metric.WithAttributes(attribute.String("grpc.method", ci.method), attribute.String("grpc.target", ci.target), attribute.String("grpc.status", st))
attributes := []attribute.KeyValue{
attribute.String("grpc.method", ci.method),
attribute.String("grpc.target", ci.target),
attribute.String("grpc.status", st),
}

for k, v := range mi.labels {
attributes = append(attributes, attribute.String(k, v))
}

clientAttributeOption := metric.WithAttributes(attributes...)
csh.clientMetrics.attemptDuration.Record(ctx, latency, clientAttributeOption)
csh.clientMetrics.attemptSentTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.sentCompressedBytes), clientAttributeOption)
csh.clientMetrics.attemptRcvdTotalCompressedMessageSize.Record(ctx, atomic.LoadInt64(&mi.recvCompressedBytes), clientAttributeOption)
Expand Down
92 changes: 92 additions & 0 deletions stats/opentelemetry/csm/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package csm

import (
"context"
"net/url"

"google.golang.org/grpc"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats/opentelemetry"
otelinternal "google.golang.org/grpc/stats/opentelemetry/internal"
)

var clientSideOTelWithCSM grpc.DialOption
var clientSideOTel grpc.DialOption

// Observability sets up CSM Observability for the binary globally. It sets up
// two client side OpenTelemetry instrumentation components configured with the
// provided options, one with a CSM Plugin Option configured, one without.
// Created channels will pick up one of these dependent on whether channels are
// CSM Channels or not, which is derived from the Channel's parsed target after
// dialing.
//
// It sets up a server side OpenTelemetry instrumentation component configured
// with options provided alongside a CSM Plugin Option to be registered globally
// and picked up for every server.
//
// The CSM Plugin Option is instantiated with local labels and metadata exchange
// labels pulled from the environment, and emits metadata exchange labels from
// the peer, local labels, and xDS Labels if provided. Context timeouts does not
// trigger an error, but sets certain labels to "unknown".
//
// This function is not thread safe, and should only be invoked once in main
// before any channels or servers are created. Returns a cleanup function to be
// deferred in main.
func Observability(ctx context.Context, options opentelemetry.Options) func() {
csmPluginOption := newPluginOption(ctx) // Do this operation in tests now...because this is global to the binary...
clientSideOTelWithCSM = dialOptionWithCSMPluginOption(options, csmPluginOption)
clientSideOTel = opentelemetry.DialOption(options)

serverSideOTelWithCSM := serverOptionWithCSMPluginOption(options, csmPluginOption)

// internal.AddGlobalLateApplyDialOptions.(func(opt ...LateApplyDialOption))(&testLateApplyDialOption{})


internal.AddGlobalServerOptions.(func(opt ...grpc.ServerOption))(serverSideOTelWithCSM) // sets the global - not intended to be used with o11y so we're good here...

return func() {
internal.ClearGlobalServerOptions()
internal.ClearGlobalLateApplyDialOptions()
}
} // this won't actually compile unless I rebase

type lateApplyDialOption interface { // doing this in other PR...although idk how if this will be able to access...
DialOption(parsedTarget *url.URL) grpc.DialOption
}

// Just call this with a pointer...
// Do I register just this function or do I need to make this on an object...
func DialOption(parsedTarget *url.URL) grpc.DialOption {
if determineTargetCSM(parsedTarget) {
return clientSideOTelWithCSM
}
return clientSideOTel
}

func dialOptionWithCSMPluginOption(options opentelemetry.Options, po otelinternal.PluginOption) grpc.DialOption { // test at this layer and replace grpc.DialOption in OTel tests...
otelinternal.SetPluginOption.(func(options opentelemetry.Options, po otelinternal.PluginOption))(options, po)
return opentelemetry.DialOption(options)
}

func serverOptionWithCSMPluginOption(options opentelemetry.Options, po otelinternal.PluginOption) grpc.ServerOption {
otelinternal.SetPluginOption.(func(options opentelemetry.Options, po otelinternal.PluginOption))(options, po)
return opentelemetry.ServerOption(options)
}

0 comments on commit 714a660

Please sign in to comment.