Skip to content

Commit

Permalink
comments etc
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Feb 11, 2022
1 parent 535e619 commit 07ec1d6
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 84 deletions.
5 changes: 1 addition & 4 deletions balancer/grpclb/grpclb_remote_balancer.go
Expand Up @@ -35,7 +35,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -240,9 +239,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
dopts = append(dopts, grpc.WithResolvers(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))

// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Expand Down
3 changes: 2 additions & 1 deletion channelz/channelz.go
Expand Up @@ -31,5 +31,6 @@ package channelz

import "google.golang.org/grpc/internal/channelz"

// TODO(easwars): Add comment.
// Identifier is an opaque identifier which uniquely identifies an entity in the
// channelz database.
type Identifier = channelz.Identifier
5 changes: 3 additions & 2 deletions clientconn.go
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -1320,7 +1321,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", pretty.ToJSON(addr), err)
return err
}

Expand All @@ -1333,7 +1334,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr.Close(transport.ErrConnClosing)
if connectCtx.Err() == context.DeadlineExceeded {
err := errors.New("failed to receive server preface within timeout")
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s: %v", pretty.ToJSON(addr), err)
return err
}
return nil
Expand Down
13 changes: 13 additions & 0 deletions internal/channelz/id.go
Expand Up @@ -20,34 +20,45 @@ package channelz

import "fmt"

// Identifier is an opaque identifier which uniquely identifies an entity in the
// channelz database.
type Identifier struct {
typ RefChannelType
id int64
str string
pid *Identifier
}

// Type returns the entity type corresponding to id.
func (id *Identifier) Type() RefChannelType {
if id == nil {
return RefUnknown
}
return id.typ
}

// Int returns the integer identifier corresponding to id.
func (id *Identifier) Int() int64 {
if id == nil {
return 0
}
return id.id
}

// String returns a string representation of the entity corresponding to id.

// This includes some information about the parent as well. Examples:
// Top-level channel: [Channel #channel-number]
// Nested channel: [Channel #parent-channel-number Channel #channel-number]
// Sub channel: [Channel #parent-channel SubChannel #subchannel-number]
func (id *Identifier) String() string {
if id == nil {
return ""
}
return id.str
}

// Equal returns true if other is the same as id.
func (id *Identifier) Equal(other *Identifier) bool {
if (id != nil) != (other != nil) {
return false
Expand All @@ -58,6 +69,8 @@ func (id *Identifier) Equal(other *Identifier) bool {
return id.typ == other.typ && id.id == other.id && id.pid == other.pid
}

// NewIdentifierForTesting returns a new opaque identifier to be used only for
// testing purposes.
func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier {
return newIdentifer(typ, id, pid)
}
Expand Down
16 changes: 10 additions & 6 deletions internal/channelz/logging.go
Expand Up @@ -26,9 +26,13 @@ import (

var logger = grpclog.Component("channelz")

func withParens(id *Identifier) string {
return "[" + id.String() + "] "
}

// Info logs and adds a trace event if channelz is on.
func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
msg := "[" + id.String() + "] " + fmt.Sprint(args...)
msg := withParens(id) + fmt.Sprint(args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Expand All @@ -41,7 +45,7 @@ func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {

// Infof logs and adds a trace event if channelz is on.
func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
msg := "[" + id.String() + "] " + fmt.Sprintf(format, args...)
msg := withParens(id) + fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Expand All @@ -54,7 +58,7 @@ func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...inter

// Warning logs and adds a trace event if channelz is on.
func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
msg := "[" + id.String() + "] " + fmt.Sprint(args...)
msg := withParens(id) + fmt.Sprint(args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Expand All @@ -67,7 +71,7 @@ func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {

// Warningf logs and adds a trace event if channelz is on.
func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
msg := "[" + id.String() + "] " + fmt.Sprintf(format, args...)
msg := withParens(id) + fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Expand All @@ -80,7 +84,7 @@ func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...in

// Error logs and adds a trace event if channelz is on.
func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
msg := "[" + id.String() + "] " + fmt.Sprint(args...)
msg := withParens(id) + fmt.Sprint(args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Expand All @@ -93,7 +97,7 @@ func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {

// Errorf logs and adds a trace event if channelz is on.
func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
msg := "[" + id.String() + "] " + fmt.Sprintf(format, args...)
msg := withParens(id) + fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Expand Down
15 changes: 4 additions & 11 deletions internal/transport/http2_client.go
Expand Up @@ -351,14 +351,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
var err error
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
// TODO(easwars): See if you need to return a more meaningful error.
// TODO(easwars): Also, check if the transport needs to be closed before returning.
return nil, err
}
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
return nil, err
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
Expand Down Expand Up @@ -904,9 +899,7 @@ func (t *http2Client) Close(err error) {
t.controlBuf.finish()
t.cancel()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
channelz.RemoveEntry(t.channelzID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
Expand Down
15 changes: 4 additions & 11 deletions internal/transport/http2_server.go
Expand Up @@ -275,14 +275,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
var err error
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
// TODO(easwars): See if you need to return a more meaningful error.
// TODO(easwars): Also, check if the transport needs to be closed before returning.
return nil, err
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
return nil, err
}

t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
Expand Down Expand Up @@ -1215,9 +1210,7 @@ func (t *http2Server) Close() {
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
channelz.RemoveEntry(t.channelzID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()
Expand Down
84 changes: 35 additions & 49 deletions server.go
Expand Up @@ -584,9 +584,7 @@ func NewServer(opt ...ServerOption) *Server {
s.initServerWorkers()
}

if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
return s
}

Expand Down Expand Up @@ -724,9 +722,7 @@ func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {

func (l *listenSocket) Close() error {
err := l.Listener.Close()
if channelz.IsOn() {
channelz.RemoveEntry(l.channelzID)
}
channelz.RemoveEntry(l.channelzID)
return err
}

Expand All @@ -737,52 +733,50 @@ func (l *listenSocket) Close() error {
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
s.serve = true
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}
if err := func() error { // Anonymous func to be able to defer the unlock.
s.mu.Lock()
defer s.mu.Unlock()

s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
s.printf("serving")
s.serve = true
if s.lis == nil {
lis.Close()
return ErrServerStopped
}
}()

ls := &listenSocket{Listener: lis}
s.lis[ls] = true
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
}
}()

ls := &listenSocket{Listener: lis}
s.lis[ls] = true

if channelz.IsOn() {
var err error
ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
if err != nil {
// TODO(easwars): See if you can defer this unlock somehow.
s.mu.Unlock()
lis.Close()
// TODO(easwars): return a more meaningful error.
return err
}
}
s.mu.Unlock()

defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
return nil
}(); err != nil {
return err
}

var tempDelay time.Duration // how long to sleep on accept failure

for {
rawConn, err := lis.Accept()
if err != nil {
Expand Down Expand Up @@ -1717,11 +1711,7 @@ func (s *Server) Stop() {
s.done.Fire()
}()

s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })

s.mu.Lock()
listeners := s.lis
Expand Down Expand Up @@ -1759,11 +1749,7 @@ func (s *Server) GracefulStop() {
s.quit.Fire()
defer s.done.Fire()

s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
Expand Down

0 comments on commit 07ec1d6

Please sign in to comment.