Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: add support for multiple stats handlers in a single client or server #5347

Merged
merged 6 commits into from Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dialoptions.go
Expand Up @@ -380,7 +380,7 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
// all the RPCs and underlying network connections in this ClientConn.
func WithStatsHandler(h stats.Handler) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.StatsHandler = h
o.copts.StatsHandlers = append(o.copts.StatsHandlers, h)
})
}

Expand Down
22 changes: 11 additions & 11 deletions internal/transport/handler_server.go
Expand Up @@ -49,7 +49,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
}
Expand Down Expand Up @@ -138,7 +138,7 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string

stats stats.Handler
stats []stats.Handler
}

func (ht *serverHandlerTransport) Close() {
Expand Down Expand Up @@ -228,10 +228,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})

if err == nil { // transport has not been closed
if ht.stats != nil {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
Expand Down Expand Up @@ -314,10 +314,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})

if err == nil {
if ht.stats != nil {
for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
Expand Down Expand Up @@ -369,14 +369,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
if ht.stats != nil {
s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
for _, sh := range ht.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress,
}
ht.stats.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
Expand Down
45 changes: 24 additions & 21 deletions internal/transport/http2_client.go
Expand Up @@ -90,7 +90,7 @@ type http2Client struct {
kp keepalive.ClientParameters
keepaliveEnabled bool

statsHandler stats.Handler
statsHandlers []stats.Handler

initialWindowSize int32

Expand Down Expand Up @@ -311,7 +311,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
statsHandler: opts.StatsHandler,
statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
Expand Down Expand Up @@ -341,15 +341,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl,
}
}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
for _, sh := range t.statsHandlers {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connBegin)
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
Expand Down Expand Up @@ -773,24 +773,27 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if t.statsHandler != nil {
if len(t.statsHandlers) != 0 {
header, ok := metadata.FromOutgoingContext(ctx)
if ok {
header.Set("user-agent", t.userAgent)
} else {
header = metadata.Pairs("user-agent", t.userAgent)
}
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
for _, sh := range t.statsHandlers {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
// Note: Creating a new stats object to prevent pollution.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
}
sh.HandleRPC(s.ctx, outHeader)
}
t.statsHandler.HandleRPC(s.ctx, outHeader)
}
return s, nil
}
Expand Down Expand Up @@ -916,11 +919,11 @@ func (t *http2Client) Close(err error) {
for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
if t.statsHandler != nil {
for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{
Client: true,
}
t.statsHandler.HandleConn(t.ctx, connEnd)
sh.HandleConn(t.ctx, connEnd)
}
}

Expand Down Expand Up @@ -1432,22 +1435,22 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan)
}

if t.statsHandler != nil {
for _, sh := range t.statsHandlers {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
t.statsHandler.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
t.statsHandler.HandleRPC(s.ctx, inTrailer)
sh.HandleRPC(s.ctx, inTrailer)
}
}

Expand Down
28 changes: 14 additions & 14 deletions internal/transport/http2_server.go
Expand Up @@ -82,7 +82,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
fc *trInFlow
stats stats.Handler
stats []stats.Handler
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
Expand Down Expand Up @@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler,
stats: config.StatsHandlers,
kp: kp,
idle: time.Now(),
kep: kep,
Expand All @@ -272,13 +272,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
for _, sh := range t.stats {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
Expand Down Expand Up @@ -566,8 +566,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
if t.stats != nil {
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
Expand All @@ -576,7 +576,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
}
t.stats.HandleRPC(s.ctx, inHeader)
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
Expand Down Expand Up @@ -992,14 +992,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
if t.stats != nil {
for _, sh := range t.stats {
// Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Header: s.header.Copy(),
Compression: s.sendCompress,
}
t.stats.HandleRPC(s.Context(), outHeader)
sh.HandleRPC(s.Context(), outHeader)
}
return nil
}
Expand Down Expand Up @@ -1060,10 +1060,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
if t.stats != nil {
for _, sh := range t.stats {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
Expand Down Expand Up @@ -1218,9 +1218,9 @@ func (t *http2Server) Close() {
for _, s := range streams {
s.cancel()
}
if t.stats != nil {
for _, sh := range t.stats {
connEnd := &stats.ConnEnd{}
t.stats.HandleConn(t.ctx, connEnd)
sh.HandleConn(t.ctx, connEnd)
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/transport/transport.go
Expand Up @@ -523,7 +523,7 @@ type ServerConfig struct {
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
Expand Down Expand Up @@ -553,8 +553,8 @@ type ConnectOptions struct {
CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// StatsHandlers stores the handler for stats.
StatsHandlers []stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
Expand Down