From 606f3e1310f9e5131dd70ad2385c00e960b3cdb5 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 10 Feb 2022 11:24:17 -0800 Subject: [PATCH] include entity identifier in channelz logs --- balancer/balancer.go | 3 +- balancer/grpclb/grpclb_remote_balancer.go | 5 +- balancer_conn_wrappers.go | 1 + channelz/channelz.go | 36 +++ channelz/service/service_sktopt_test.go | 23 +- channelz/service/service_test.go | 221 ++++++++++-------- clientconn.go | 111 ++++----- dialoptions.go | 5 +- internal/balancergroup/balancergroup_test.go | 9 +- internal/channelz/funcs.go | 141 +++++++---- internal/channelz/id.go | 84 +++++++ internal/channelz/logging.go | 37 +-- internal/channelz/types.go | 23 +- internal/transport/http2_client.go | 11 +- internal/transport/http2_server.go | 12 +- internal/transport/transport.go | 5 +- resolver_conn_wrapper.go | 23 +- server.go | 90 ++++--- test/channelz_test.go | 72 +++--- .../clustermanager/clustermanager_test.go | 4 +- 20 files changed, 569 insertions(+), 347 deletions(-) create mode 100644 channelz/channelz.go create mode 100644 internal/channelz/id.go diff --git a/balancer/balancer.go b/balancer/balancer.go index bcc6f5451c90..f7a7697cad02 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -27,6 +27,7 @@ import ( "net" "strings" + "google.golang.org/grpc/channelz" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" @@ -192,7 +193,7 @@ type BuildOptions struct { // server can ignore this field. Authority string // ChannelzParentID is the parent ClientConn's channelz ID. - ChannelzParentID int64 + ChannelzParentID *channelz.Identifier // CustomUserAgent is the custom user agent set on the parent ClientConn. // The balancer should set the same custom user agent if it creates a // ClientConn. diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 805bbbb789ae..dab1959418e1 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/backoff" - "google.golang.org/grpc/internal/channelz" imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -240,9 +239,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { // Explicitly set pickfirst as the balancer. dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`)) dopts = append(dopts, grpc.WithResolvers(lb.manualResolver)) - if channelz.IsOn() { - dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID)) - } + dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID)) // Enable Keepalive for grpclb client. dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index f4ea61746823..5eb87a552036 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -184,6 +184,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } ac, err := ccb.cc.newAddrConn(addrs, opts) if err != nil { + channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err } acbw := &acBalancerWrapper{ac: ac} diff --git a/channelz/channelz.go b/channelz/channelz.go new file mode 100644 index 000000000000..a220c47c59a5 --- /dev/null +++ b/channelz/channelz.go @@ -0,0 +1,36 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package channelz exports internals of the channelz implementation as required +// by other gRPC packages. +// +// The implementation of the channelz spec as defined in +// https://github.com/grpc/proposal/blob/master/A14-channelz.md, is provided by +// the `internal/channelz` package. +// +// Experimental +// +// Notice: All APIs in this package are experimental and may be removed in a +// later release. +package channelz + +import "google.golang.org/grpc/internal/channelz" + +// Identifier is an opaque identifier which uniquely identifies an entity in the +// channelz database. +type Identifier = channelz.Identifier diff --git a/channelz/service/service_sktopt_test.go b/channelz/service/service_sktopt_test.go index efd383fce3c8..7302d9105a4c 100644 --- a/channelz/service/service_sktopt_test.go +++ b/channelz/service/service_sktopt_test.go @@ -28,15 +28,17 @@ package service import ( "context" - "reflect" "strconv" "testing" "github.com/golang/protobuf/ptypes" durpb "github.com/golang/protobuf/ptypes/duration" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "golang.org/x/sys/unix" channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" "google.golang.org/grpc/internal/channelz" + "google.golang.org/protobuf/testing/protocmp" ) func init() { @@ -139,20 +141,27 @@ func (s) TestGetSocketOptions(t *testing.T) { }, } svr := newCZServer() - ids := make([]int64, len(ss)) + ids := make([]*channelz.Identifier, len(ss)) svrID := channelz.RegisterServer(&dummyServer{}, "") defer channelz.RemoveEntry(svrID) for i, s := range ss { - ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i)) + ids[i], _ = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i)) defer channelz.RemoveEntry(ids[i]) } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() for i, s := range ss { - resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i]}) - metrics := resp.GetSocket() - if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) { - t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics)) + resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i].Int()}) + got, want := resp.GetSocket().GetRef(), &channelzpb.SocketRef{SocketId: ids[i].Int(), Name: strconv.Itoa(i)} + if !cmp.Equal(got, want, cmpopts.IgnoreUnexported(channelzpb.SocketRef{})) { + t.Fatalf("resp.GetSocket() returned metrics.GetRef() = %#v, want %#v", got, want) + } + socket, err := socketProtoToStruct(resp.GetSocket()) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(s, socket, protocmp.Transform(), cmp.AllowUnexported(dummySocket{})); diff != "" { + t.Fatalf("unexpected socket, diff (-want +got):\n%s", diff) } } } diff --git a/channelz/service/service_test.go b/channelz/service/service_test.go index 17409533745b..6b05aa0c8524 100644 --- a/channelz/service/service_test.go +++ b/channelz/service/service_test.go @@ -22,18 +22,21 @@ import ( "context" "fmt" "net" - "reflect" "strconv" + "strings" "testing" "time" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" + "google.golang.org/protobuf/testing/protocmp" ) func init() { @@ -61,14 +64,6 @@ type protoToSocketOptFunc func([]*channelzpb.SocketOption) *channelz.SocketOptio // It is only defined under linux environment on x86 architecture. var protoToSocketOpt protoToSocketOptFunc -// emptyTime is used for detecting unset value of time.Time type. -// For go1.7 and earlier, ptypes.Timestamp will fill in the loc field of time.Time -// with &utcLoc. However zero value of a time.Time type value loc field is nil. -// This behavior will make reflect.DeepEqual fail upon unset time.Time field, -// and cause false positive fatal error. -// TODO: Go1.7 is no longer supported - does this need a change? -var emptyTime time.Time - const defaultTestTimeout = 10 * time.Second type dummyChannel struct { @@ -149,7 +144,7 @@ func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric { } } -func channelProtoToStruct(c *channelzpb.Channel) *dummyChannel { +func channelProtoToStruct(c *channelzpb.Channel) (*dummyChannel, error) { dc := &dummyChannel{} pdata := c.GetData() switch pdata.GetState().GetState() { @@ -170,29 +165,29 @@ func channelProtoToStruct(c *channelzpb.Channel) *dummyChannel { dc.callsStarted = pdata.CallsStarted dc.callsSucceeded = pdata.CallsSucceeded dc.callsFailed = pdata.CallsFailed - if t, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()); err == nil { - if !t.Equal(emptyTime) { - dc.lastCallStartedTimestamp = t - } + ts, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()) + if err != nil { + return nil, err } - return dc + dc.lastCallStartedTimestamp = ts + return dc, nil } -func serverProtoToStruct(s *channelzpb.Server) *dummyServer { +func serverProtoToStruct(s *channelzpb.Server) (*dummyServer, error) { ds := &dummyServer{} pdata := s.GetData() ds.callsStarted = pdata.CallsStarted ds.callsSucceeded = pdata.CallsSucceeded ds.callsFailed = pdata.CallsFailed - if t, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()); err == nil { - if !t.Equal(emptyTime) { - ds.lastCallStartedTimestamp = t - } + ts, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()) + if err != nil { + return nil, err } - return ds + ds.lastCallStartedTimestamp = ts + return ds, nil } -func socketProtoToStruct(s *channelzpb.Socket) *dummySocket { +func socketProtoToStruct(s *channelzpb.Socket) (*dummySocket, error) { ds := &dummySocket{} pdata := s.GetData() ds.streamsStarted = pdata.GetStreamsStarted() @@ -201,26 +196,26 @@ func socketProtoToStruct(s *channelzpb.Socket) *dummySocket { ds.messagesSent = pdata.GetMessagesSent() ds.messagesReceived = pdata.GetMessagesReceived() ds.keepAlivesSent = pdata.GetKeepAlivesSent() - if t, err := ptypes.Timestamp(pdata.GetLastLocalStreamCreatedTimestamp()); err == nil { - if !t.Equal(emptyTime) { - ds.lastLocalStreamCreatedTimestamp = t - } - } - if t, err := ptypes.Timestamp(pdata.GetLastRemoteStreamCreatedTimestamp()); err == nil { - if !t.Equal(emptyTime) { - ds.lastRemoteStreamCreatedTimestamp = t - } - } - if t, err := ptypes.Timestamp(pdata.GetLastMessageSentTimestamp()); err == nil { - if !t.Equal(emptyTime) { - ds.lastMessageSentTimestamp = t - } - } - if t, err := ptypes.Timestamp(pdata.GetLastMessageReceivedTimestamp()); err == nil { - if !t.Equal(emptyTime) { - ds.lastMessageReceivedTimestamp = t - } - } + ts, err := ptypes.Timestamp(pdata.GetLastLocalStreamCreatedTimestamp()) + if err != nil { + return nil, err + } + ds.lastLocalStreamCreatedTimestamp = ts + ts, err = ptypes.Timestamp(pdata.GetLastRemoteStreamCreatedTimestamp()) + if err != nil { + return nil, err + } + ds.lastRemoteStreamCreatedTimestamp = ts + ts, err = ptypes.Timestamp(pdata.GetLastMessageSentTimestamp()) + if err != nil { + return nil, err + } + ds.lastMessageSentTimestamp = ts + ts, err = ptypes.Timestamp(pdata.GetLastMessageReceivedTimestamp()) + if err != nil { + return nil, err + } + ds.lastMessageReceivedTimestamp = ts if v := pdata.GetLocalFlowControlWindow(); v != nil { ds.localFlowControlWindow = v.Value } @@ -240,7 +235,7 @@ func socketProtoToStruct(s *channelzpb.Socket) *dummySocket { ds.remoteAddr = protoToAddr(remote) } ds.remoteName = s.GetRemoteName() - return ds + return ds, nil } func protoToSecurity(protoSecurity *channelzpb.Security) credentials.ChannelzSecurityValue { @@ -325,7 +320,7 @@ func (s) TestGetTopChannels(t *testing.T) { czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) for _, c := range tcs { - id := channelz.RegisterChannel(c, 0, "") + id := channelz.RegisterChannel(c, nil, "") defer channelz.RemoveEntry(id) } s := newCZServer() @@ -336,12 +331,16 @@ func (s) TestGetTopChannels(t *testing.T) { t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd()) } for i, c := range resp.GetChannel() { - if !reflect.DeepEqual(channelProtoToStruct(c), tcs[i]) { - t.Fatalf("dummyChannel: %d, want: %#v, got: %#v", i, tcs[i], channelProtoToStruct(c)) + channel, err := channelProtoToStruct(c) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(tcs[i], channel, protocmp.Transform(), cmp.AllowUnexported(dummyChannel{})); diff != "" { + t.Fatalf("unexpected channel, diff (-want +got):\n%s", diff) } } for i := 0; i < 50; i++ { - id := channelz.RegisterChannel(tcs[0], 0, "") + id := channelz.RegisterChannel(tcs[0], nil, "") defer channelz.RemoveEntry(id) } resp, _ = s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0}) @@ -385,8 +384,12 @@ func (s) TestGetServers(t *testing.T) { t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd()) } for i, s := range resp.GetServer() { - if !reflect.DeepEqual(serverProtoToStruct(s), ss[i]) { - t.Fatalf("dummyServer: %d, want: %#v, got: %#v", i, ss[i], serverProtoToStruct(s)) + server, err := serverProtoToStruct(s) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(ss[i], server, protocmp.Transform(), cmp.AllowUnexported(dummyServer{})); diff != "" { + t.Fatalf("unexpected server, diff (-want +got):\n%s", diff) } } for i := 0; i < 50; i++ { @@ -405,34 +408,34 @@ func (s) TestGetServerSockets(t *testing.T) { svrID := channelz.RegisterServer(&dummyServer{}, "") defer channelz.RemoveEntry(svrID) refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"} - ids := make([]int64, 3) - ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0]) - ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1]) - ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2]) + ids := make([]*channelz.Identifier, 3) + ids[0], _ = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0]) + ids[1], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1]) + ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2]) for _, id := range ids { defer channelz.RemoveEntry(id) } svr := newCZServer() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0}) + resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: 0}) if !resp.GetEnd() { t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd()) } // GetServerSockets only return normal sockets. want := map[int64]string{ - ids[1]: refNames[1], - ids[2]: refNames[2], + ids[1].Int(): refNames[1], + ids[2].Int(): refNames[2], } - if !reflect.DeepEqual(convertSocketRefSliceToMap(resp.GetSocketRef()), want) { + if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) { t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef()) } for i := 0; i < 50; i++ { - id := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "") + id, _ := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "") defer channelz.RemoveEntry(id) } - resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0}) + resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: 0}) if resp.GetEnd() { t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd()) } @@ -446,10 +449,10 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { svrID := channelz.RegisterServer(&dummyServer{}, "") defer channelz.RemoveEntry(svrID) refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"} - ids := make([]int64, 3) - ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0]) - ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1]) - ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2]) + ids := make([]*channelz.Identifier, 3) + ids[0], _ = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0]) + ids[1], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1]) + ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2]) for _, id := range ids { defer channelz.RemoveEntry(id) } @@ -458,16 +461,16 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { defer cancel() // Make GetServerSockets with startID = ids[1]+1, so socket-1 won't be // included in the response. - resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: ids[1] + 1}) + resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: ids[1].Int() + 1}) if !resp.GetEnd() { t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd()) } // GetServerSockets only return normal socket-2, socket-1 should be // filtered by start ID. want := map[int64]string{ - ids[2]: refNames[2], + ids[2].Int(): refNames[2], } - if !reflect.DeepEqual(convertSocketRefSliceToMap(resp.GetSocketRef()), want) { + if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) { t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef()) } } @@ -475,38 +478,45 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { func (s) TestGetChannel(t *testing.T) { czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) + refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"} - ids := make([]int64, 4) - ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0]) + ids := make([]*channelz.Identifier, 4) + ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0]) channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{ Desc: "Channel Created", Severity: channelz.CtInfo, }) + ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1]) channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{ Desc: "Channel Created", Severity: channelz.CtInfo, Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]), + Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1].Int()), Severity: channelz.CtInfo, }, }) - ids[2] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2]) + var err error + ids[2], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2]) + if err != nil { + t.Fatalf("channelz.RegisterSubChannel() failed: %v", err) + } channelz.AddTraceEvent(logger, ids[2], 0, &channelz.TraceEventDesc{ Desc: "SubChannel Created", Severity: channelz.CtInfo, Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]), + Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2].Int()), Severity: channelz.CtInfo, }, }) + ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3]) channelz.AddTraceEvent(logger, ids[3], 0, &channelz.TraceEventDesc{ Desc: "Channel Created", Severity: channelz.CtInfo, Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3]), + Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3].Int()), Severity: channelz.CtInfo, }, }) @@ -518,21 +528,23 @@ func (s) TestGetChannel(t *testing.T) { Desc: "Resolver returns an empty address list", Severity: channelz.CtWarning, }) + for _, id := range ids { defer channelz.RemoveEntry(id) } + svr := newCZServer() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[0]}) + resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[0].Int()}) metrics := resp.GetChannel() subChans := metrics.GetSubchannelRef() - if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2] { - t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2], Name: refNames[2]}}, subChans) + if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2].Int() { + t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2].Int(), Name: refNames[2]}}, subChans) } nestedChans := metrics.GetChannelRef() - if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1] { - t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1], Name: refNames[1]}}, nestedChans) + if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1].Int() { + t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1].Int(), Name: refNames[1]}}, nestedChans) } trace := metrics.GetData().GetTrace() want := []struct { @@ -542,14 +554,14 @@ func (s) TestGetChannel(t *testing.T) { childRef string }{ {desc: "Channel Created", severity: channelzpb.ChannelTraceEvent_CT_INFO}, - {desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1], childRef: refNames[1]}, - {desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2], childRef: refNames[2]}, + {desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1].Int()), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1].Int(), childRef: refNames[1]}, + {desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2].Int()), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2].Int(), childRef: refNames[2]}, {desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), severity: channelzpb.ChannelTraceEvent_CT_INFO}, {desc: "Resolver returns an empty address list", severity: channelzpb.ChannelTraceEvent_CT_WARNING}, } for i, e := range trace.Events { - if e.GetDescription() != want[i].desc { + if !strings.Contains(e.GetDescription(), want[i].desc) { t.Fatalf("trace: GetDescription want %#v, got %#v", want[i].desc, e.GetDescription()) } if e.GetSeverity() != want[i].severity { @@ -564,11 +576,11 @@ func (s) TestGetChannel(t *testing.T) { } } } - resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[1]}) + resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[1].Int()}) metrics = resp.GetChannel() nestedChans = metrics.GetChannelRef() - if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3] { - t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3], Name: refNames[3]}}, nestedChans) + if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3].Int() { + t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3].Int(), Name: refNames[3]}}, nestedChans) } } @@ -581,23 +593,27 @@ func (s) TestGetSubChannel(t *testing.T) { czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"} - ids := make([]int64, 4) - ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0]) + ids := make([]*channelz.Identifier, 4) + ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0]) channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{ Desc: "Channel Created", Severity: channelz.CtInfo, }) - ids[1] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1]) + var err error + ids[1], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1]) + if err != nil { + t.Fatalf("channelz.RegisterSubChannel() failed: %v", err) + } channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{ Desc: subchanCreated, Severity: channelz.CtInfo, Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0]), + Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0].Int()), Severity: channelz.CtInfo, }, }) - ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2]) - ids[3] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3]) + ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2]) + ids[3], _ = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3]) channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{ Desc: subchanConnectivityChange, Severity: channelz.CtInfo, @@ -612,13 +628,13 @@ func (s) TestGetSubChannel(t *testing.T) { svr := newCZServer() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]}) + resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: ids[1].Int()}) metrics := resp.GetSubchannel() want := map[int64]string{ - ids[2]: refNames[2], - ids[3]: refNames[3], + ids[2].Int(): refNames[2], + ids[3].Int(): refNames[3], } - if !reflect.DeepEqual(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) { + if !cmp.Equal(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) { t.Fatalf("metrics.GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef()) } @@ -726,20 +742,27 @@ func (s) TestGetSocket(t *testing.T) { }, } svr := newCZServer() - ids := make([]int64, len(ss)) + ids := make([]*channelz.Identifier, len(ss)) svrID := channelz.RegisterServer(&dummyServer{}, "") defer channelz.RemoveEntry(svrID) for i, s := range ss { - ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i)) + ids[i], _ = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i)) defer channelz.RemoveEntry(ids[i]) } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() for i, s := range ss { - resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i]}) - metrics := resp.GetSocket() - if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) { - t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics)) + resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i].Int()}) + got, want := resp.GetSocket().GetRef(), &channelzpb.SocketRef{SocketId: ids[i].Int(), Name: strconv.Itoa(i)} + if !cmp.Equal(got, want, cmpopts.IgnoreUnexported(channelzpb.SocketRef{})) { + t.Fatalf("resp.GetSocket() returned metrics.GetRef() = %#v, want %#v", got, want) + } + socket, err := socketProtoToStruct(resp.GetSocket()) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(s, socket, protocmp.Transform(), cmp.AllowUnexported(dummySocket{})); diff != "" { + t.Fatalf("unexpected socket, diff (-want +got):\n%s", diff) } } } diff --git a/clientconn.go b/clientconn.go index f9af78913710..f4420e7c3cb6 100644 --- a/clientconn.go +++ b/clientconn.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" @@ -159,23 +160,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - if channelz.IsOn() { - if cc.dopts.channelzParentID != 0 { - cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) - channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{ - Desc: "Channel Created", - Severity: channelz.CtInfo, - Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), - Severity: channelz.CtInfo, - }, - }) - } else { - cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) - channelz.Info(logger, cc.channelzID, "Channel Created") + pid := cc.dopts.channelzParentID + cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target) + ted := &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel(id:%d) created", cc.channelzID.Int()), + Severity: channelz.CtInfo, + } + if cc.dopts.channelzParentID != nil { + ted.Parent = &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()), + Severity: channelz.CtInfo, } - cc.csMgr.channelzID = cc.channelzID } + channelz.AddTraceEvent(logger, cc.channelzID, 1, ted) + cc.csMgr.channelzID = cc.channelzID if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { return nil, errNoTransportSecurity @@ -398,7 +396,7 @@ type connectivityStateManager struct { mu sync.Mutex state connectivity.State notifyChan chan struct{} - channelzID int64 + channelzID *channelz.Identifier } // updateState updates the connectivity.State of ClientConn. @@ -490,7 +488,7 @@ type ClientConn struct { firstResolveEvent *grpcsync.Event - channelzID int64 // channelz unique identification number + channelzID *channelz.Identifier czData *channelzData lceMu sync.Mutex // protects lastConnectionError @@ -768,17 +766,21 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub cc.mu.Unlock() return nil, ErrClientConnClosing } - if channelz.IsOn() { - ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") - channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ - Desc: "Subchannel Created", - Severity: channelz.CtInfo, - Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), - Severity: channelz.CtInfo, - }, - }) + + var err error + ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") + if err != nil { + return nil, err } + channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()), + Severity: channelz.CtInfo, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()), + Severity: channelz.CtInfo, + }, + }) + cc.conns[ac] = struct{}{} cc.mu.Unlock() return ac, nil @@ -1085,22 +1087,22 @@ func (cc *ClientConn) Close() error { for ac := range conns { ac.tearDown(ErrClientConnClosing) } - if channelz.IsOn() { - ted := &channelz.TraceEventDesc{ - Desc: "Channel Deleted", + ted := &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel(id:%d) deleted", cc.channelzID.Int()), + Severity: channelz.CtInfo, + } + if cc.dopts.channelzParentID != nil { + ted.Parent = &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()), Severity: channelz.CtInfo, } - if cc.dopts.channelzParentID != 0 { - ted.Parent = &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), - Severity: channelz.CtInfo, - } - } - channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) - // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to - // the entity being deleted, and thus prevent it from being deleted right away. - channelz.RemoveEntry(cc.channelzID) } + channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) + // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add + // trace reference to the entity being deleted, and thus prevent it from being + // deleted right away. + channelz.RemoveEntry(cc.channelzID) + return nil } @@ -1130,7 +1132,7 @@ type addrConn struct { backoffIdx int // Needs to be stateful for resetConnectBackoff. resetBackoff chan struct{} - channelzID int64 // channelz unique identification number. + channelzID *channelz.Identifier czData *channelzData } @@ -1319,7 +1321,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose) if err != nil { // newTr is either nil, or closed. - channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err) + channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", pretty.ToJSON(addr), err) return err } @@ -1332,7 +1334,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne newTr.Close(transport.ErrConnClosing) if connectCtx.Err() == context.DeadlineExceeded { err := errors.New("failed to receive server preface within timeout") - channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err) + channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s: %v", pretty.ToJSON(addr), err) return err } return nil @@ -1497,19 +1499,18 @@ func (ac *addrConn) tearDown(err error) { curTr.GracefulClose() ac.mu.Lock() } - if channelz.IsOn() { - channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ - Desc: "Subchannel Deleted", + channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()), + Severity: channelz.CtInfo, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()), Severity: channelz.CtInfo, - Parent: &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), - Severity: channelz.CtInfo, - }, - }) - // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to - // the entity being deleted, and thus prevent it from being deleted right away. - channelz.RemoveEntry(ac.channelzID) - } + }, + }) + // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add + // trace reference to the entity being deleted, and thus prevent it from + // being deleted right away. + channelz.RemoveEntry(ac.channelzID) ac.mu.Unlock() } diff --git a/dialoptions.go b/dialoptions.go index c4bf09f9e940..bdfc200e3bb2 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/backoff" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/channelz" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" @@ -57,7 +58,7 @@ type dialOptions struct { callOptions []CallOption // This is used by WithBalancerName dial option. balancerBuilder balancer.Builder - channelzParentID int64 + channelzParentID *channelz.Identifier disableServiceConfig bool disableRetry bool disableHealthCheck bool @@ -498,7 +499,7 @@ func WithAuthority(a string) DialOption { // // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. -func WithChannelzParentID(id int64) DialOption { +func WithChannelzParentID(id *channelz.Identifier) DialOption { return newFuncDialOption(func(o *dialOptions) { o.channelzParentID = id }) diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index 4942f8a7da87..d8a5a1c19b86 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" @@ -474,17 +475,15 @@ func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) { // to the balancergroup at creation time is passed to child policies. func (s) TestBalancerGroupBuildOptions(t *testing.T) { const ( - balancerName = "stubBalancer-TestBalancerGroupBuildOptions" - parent = int64(1234) - userAgent = "ua" - defaultTestTimeout = 1 * time.Second + balancerName = "stubBalancer-TestBalancerGroupBuildOptions" + userAgent = "ua" ) // Setup the stub balancer such that we can read the build options passed to // it in the UpdateClientConnState method. bOpts := balancer.BuildOptions{ DialCreds: insecure.NewCredentials(), - ChannelzParentID: parent, + ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, 1234, nil), CustomUserAgent: userAgent, } stub.Register(balancerName, stub.BalancerFuncs{ diff --git a/internal/channelz/funcs.go b/internal/channelz/funcs.go index ea660a147cf9..0403cf5f78a6 100644 --- a/internal/channelz/funcs.go +++ b/internal/channelz/funcs.go @@ -25,6 +25,7 @@ package channelz import ( "context" + "errors" "fmt" "sort" "sync" @@ -184,36 +185,53 @@ func GetServer(id int64) *ServerMetric { return db.get().GetServer(id) } -// RegisterChannel registers the given channel c in channelz database with ref -// as its reference name, and add it to the child list of its parent (identified -// by pid). pid = 0 means no parent. It returns the unique channelz tracking id -// assigned to this channel. -func RegisterChannel(c Channel, pid int64, ref string) int64 { +// RegisterChannel registers the given channel c in the channelz database with +// ref as its reference name, and adds it to the child list of its parent +// (identified by pid). pid == nil means no parent. +// +// Returns a unique channelz identifier assigned to this channel. +// +// If channelz is not turned ON, this function is a no-op. +func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier { + if !IsOn() { + return nil + } + id := idGen.genID() + parent := int64(0) + isTopChannel := true + if pid != nil { + isTopChannel = false + parent = pid.Int() + } + cn := &channel{ refName: ref, c: c, subChans: make(map[int64]string), nestedChans: make(map[int64]string), id: id, - pid: pid, + pid: parent, trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, } - if pid == 0 { - db.get().addChannel(id, cn, true, pid) - } else { - db.get().addChannel(id, cn, false, pid) - } - return id + db.get().addChannel(id, cn, isTopChannel, parent) + return newIdentifer(RefChannel, id, pid) } -// RegisterSubChannel registers the given channel c in channelz database with ref -// as its reference name, and add it to the child list of its parent (identified -// by pid). It returns the unique channelz tracking id assigned to this subchannel. -func RegisterSubChannel(c Channel, pid int64, ref string) int64 { - if pid == 0 { - logger.Error("a SubChannel's parent id cannot be 0") - return 0 +// RegisterSubChannel registers the given subChannel c in the channelz database +// with ref as its reference name, and adds it to the child list of its parent +// (identified by pid). +// +// Returns a unique channelz identifier assigned to this subChannel. +// +// If channelz is not turned ON, this function is a no-op. +func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) { + if !IsOn() { + return nil, nil + } + + if pid == nil { + return nil, errors.New("a SubChannel's parent id cannot be nil") } id := idGen.genID() sc := &subChannel{ @@ -221,16 +239,22 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 { c: c, sockets: make(map[int64]string), id: id, - pid: pid, + pid: pid.Int(), trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, } - db.get().addSubChannel(id, sc, pid) - return id + db.get().addSubChannel(id, sc, pid.Int()) + return newIdentifer(RefSubChannel, id, pid), nil } // RegisterServer registers the given server s in channelz database. It returns // the unique channelz tracking id assigned to this server. -func RegisterServer(s Server, ref string) int64 { +// +// If channelz is not turned ON, this function is a no-op. +func RegisterServer(s Server, ref string) *Identifier { + if !IsOn() { + return nil + } + id := idGen.genID() svr := &server{ refName: ref, @@ -240,57 +264,76 @@ func RegisterServer(s Server, ref string) int64 { id: id, } db.get().addServer(id, svr) - return id + return newIdentifer(RefServer, id, nil) } // RegisterListenSocket registers the given listen socket s in channelz database // with ref as its reference name, and add it to the child list of its parent // (identified by pid). It returns the unique channelz tracking id assigned to // this listen socket. -func RegisterListenSocket(s Socket, pid int64, ref string) int64 { - if pid == 0 { - logger.Error("a ListenSocket's parent id cannot be 0") - return 0 +// +// If channelz is not turned ON, this function is a no-op. +func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { + if !IsOn() { + return nil, nil + } + + if pid == nil { + return nil, errors.New("a ListenSocket's parent id cannot be 0") } id := idGen.genID() - ls := &listenSocket{refName: ref, s: s, id: id, pid: pid} - db.get().addListenSocket(id, ls, pid) - return id + ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()} + db.get().addListenSocket(id, ls, pid.Int()) + return newIdentifer(RefListenSocket, id, pid), nil } // RegisterNormalSocket registers the given normal socket s in channelz database -// with ref as its reference name, and add it to the child list of its parent +// with ref as its reference name, and adds it to the child list of its parent // (identified by pid). It returns the unique channelz tracking id assigned to // this normal socket. -func RegisterNormalSocket(s Socket, pid int64, ref string) int64 { - if pid == 0 { - logger.Error("a NormalSocket's parent id cannot be 0") - return 0 +// +// If channelz is not turned ON, this function is a no-op. +func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { + if !IsOn() { + return nil, nil + } + + if pid == nil { + return nil, errors.New("a NormalSocket's parent id cannot be 0") } id := idGen.genID() - ns := &normalSocket{refName: ref, s: s, id: id, pid: pid} - db.get().addNormalSocket(id, ns, pid) - return id + ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()} + db.get().addNormalSocket(id, ns, pid.Int()) + return newIdentifer(RefNormalSocket, id, pid), nil } // RemoveEntry removes an entry with unique channelz tracking id to be id from // channelz database. -func RemoveEntry(id int64) { - db.get().removeEntry(id) +// +// If channelz is not turned ON, this function is a no-op. +func RemoveEntry(id *Identifier) { + if !IsOn() { + return + } + db.get().removeEntry(id.Int()) } -// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added -// to the channel trace. -// The Parent field is optional. It is used for event that will be recorded in the entity's parent -// trace also. +// TraceEventDesc is what the caller of AddTraceEvent should provide to describe +// the event to be added to the channel trace. +// +// The Parent field is optional. It is used for an event that will be recorded +// in the entity's parent trace. type TraceEventDesc struct { Desc string Severity Severity Parent *TraceEventDesc } -// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc. -func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEventDesc) { +// AddTraceEvent adds trace related to the entity with specified id, using the +// provided TraceEventDesc. +// +// If channelz is not turned ON, this will simply log the event descriptions. +func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) { for d := desc; d != nil; d = d.Parent { switch d.Severity { case CtUnknown, CtInfo: @@ -304,7 +347,9 @@ func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEven if getMaxTraceEntry() == 0 { return } - db.get().traceEvent(id, desc) + if IsOn() { + db.get().traceEvent(id.Int(), desc) + } } // channelMap is the storage data structure for channelz. diff --git a/internal/channelz/id.go b/internal/channelz/id.go new file mode 100644 index 000000000000..286a8dff4298 --- /dev/null +++ b/internal/channelz/id.go @@ -0,0 +1,84 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import "fmt" + +// Identifier is an opaque identifier which uniquely identifies an entity in the +// channelz database. +type Identifier struct { + typ RefChannelType + id int64 + str string + pid *Identifier +} + +// Type returns the entity type corresponding to id. +func (id *Identifier) Type() RefChannelType { + if id == nil { + return RefUnknown + } + return id.typ +} + +// Int returns the integer identifier corresponding to id. +func (id *Identifier) Int() int64 { + if id == nil { + return 0 + } + return id.id +} + +// String returns a string representation of the entity corresponding to id. + +// This includes some information about the parent as well. Examples: +// Top-level channel: [Channel #channel-number] +// Nested channel: [Channel #parent-channel-number Channel #channel-number] +// Sub channel: [Channel #parent-channel SubChannel #subchannel-number] +func (id *Identifier) String() string { + if id == nil { + return "" + } + return id.str +} + +// Equal returns true if other is the same as id. +func (id *Identifier) Equal(other *Identifier) bool { + if (id != nil) != (other != nil) { + return false + } + if id == nil && other == nil { + return true + } + return id.typ == other.typ && id.id == other.id && id.pid == other.pid +} + +// NewIdentifierForTesting returns a new opaque identifier to be used only for +// testing purposes. +func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier { + return newIdentifer(typ, id, pid) +} + +func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier { + str := fmt.Sprintf("%s #%d", typ, id) + if pid != nil { + str = fmt.Sprintf("%s %s", pid, str) + } + return &Identifier{typ: typ, id: id, str: str, pid: pid} +} diff --git a/internal/channelz/logging.go b/internal/channelz/logging.go index b0013f9c8865..028bc8ac4dc5 100644 --- a/internal/channelz/logging.go +++ b/internal/channelz/logging.go @@ -26,21 +26,26 @@ import ( var logger = grpclog.Component("channelz") +func withParens(id *Identifier) string { + return "[" + id.String() + "] " +} + // Info logs and adds a trace event if channelz is on. -func Info(l grpclog.DepthLoggerV2, id int64, args ...interface{}) { +func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { + msg := withParens(id) + fmt.Sprint(args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ - Desc: fmt.Sprint(args...), + Desc: msg, Severity: CtInfo, }) } else { - l.InfoDepth(1, args...) + l.InfoDepth(1, msg) } } // Infof logs and adds a trace event if channelz is on. -func Infof(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) +func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { + msg := withParens(id) + fmt.Sprintf(format, args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -52,20 +57,21 @@ func Infof(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{} } // Warning logs and adds a trace event if channelz is on. -func Warning(l grpclog.DepthLoggerV2, id int64, args ...interface{}) { +func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { + msg := withParens(id) + fmt.Sprint(args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ - Desc: fmt.Sprint(args...), + Desc: msg, Severity: CtWarning, }) } else { - l.WarningDepth(1, args...) + l.WarningDepth(1, msg) } } // Warningf logs and adds a trace event if channelz is on. -func Warningf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) +func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { + msg := withParens(id) + fmt.Sprintf(format, args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, @@ -77,20 +83,21 @@ func Warningf(l grpclog.DepthLoggerV2, id int64, format string, args ...interfac } // Error logs and adds a trace event if channelz is on. -func Error(l grpclog.DepthLoggerV2, id int64, args ...interface{}) { +func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { + msg := withParens(id) + fmt.Sprint(args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ - Desc: fmt.Sprint(args...), + Desc: msg, Severity: CtError, }) } else { - l.ErrorDepth(1, args...) + l.ErrorDepth(1, msg) } } // Errorf logs and adds a trace event if channelz is on. -func Errorf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) +func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { + msg := withParens(id) + fmt.Sprintf(format, args...) if IsOn() { AddTraceEvent(l, id, 1, &TraceEventDesc{ Desc: msg, diff --git a/internal/channelz/types.go b/internal/channelz/types.go index 3c595d154bd3..b6d592f38893 100644 --- a/internal/channelz/types.go +++ b/internal/channelz/types.go @@ -686,12 +686,33 @@ const ( type RefChannelType int const ( + // RefUnknown indicates an unknown entity type, the zero value for this type. + RefUnknown RefChannelType = iota // RefChannel indicates the referenced entity is a Channel. - RefChannel RefChannelType = iota + RefChannel // RefSubChannel indicates the referenced entity is a SubChannel. RefSubChannel + // RefServer indicates the referenced entity is a Server. + RefServer + // RefListenSocket indicates the referenced entity is a ListenSocket. + RefListenSocket + // RefNormalSocker indicates the referenced entity is a NormalSocket. + RefNormalSocket ) +var refChannelTypeToString = map[RefChannelType]string{ + RefUnknown: "Unknown", + RefChannel: "Channel", + RefSubChannel: "SubChannel", + RefServer: "Server", + RefListenSocket: "ListenSocket", + RefNormalSocket: "NormalSocket", +} + +func (r RefChannelType) String() string { + return refChannelTypeToString[r] +} + func (c *channelTrace) dumpData() *ChannelTrace { c.mu.Lock() ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index f0c72d337105..38ed3d566fff 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -132,7 +132,7 @@ type http2Client struct { kpDormant bool // Fields below are for channelz metric collection. - channelzID int64 // channelz unique identification number + channelzID *channelz.Identifier czData *channelzData onGoAway func(GoAwayReason) @@ -351,8 +351,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } t.statsHandler.HandleConn(t.ctx, connBegin) } - if channelz.IsOn() { - t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) + t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) + if err != nil { + return nil, err } if t.keepaliveEnabled { t.kpDormancyCond = sync.NewCond(&t.mu) @@ -898,9 +899,7 @@ func (t *http2Client) Close(err error) { t.controlBuf.finish() t.cancel() t.conn.Close() - if channelz.IsOn() { - channelz.RemoveEntry(t.channelzID) - } + channelz.RemoveEntry(t.channelzID) // Append info about previous goaways if there were any, since this may be important // for understanding the root cause for this connection to be closed. _, goAwayDebugMessage := t.GetGoAwayReason() diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 2c6eaf0e59cf..227608c7f21e 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -117,7 +117,7 @@ type http2Server struct { idle time.Time // Fields below are for channelz metric collection. - channelzID int64 // channelz unique identification number + channelzID *channelz.Identifier czData *channelzData bufferPool *bufferPool @@ -275,12 +275,12 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, connBegin := &stats.ConnBegin{} t.stats.HandleConn(t.ctx, connBegin) } - if channelz.IsOn() { - t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) + t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) + if err != nil { + return nil, err } t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1) - t.framer.writer.Flush() defer func() { @@ -1210,9 +1210,7 @@ func (t *http2Server) Close() { if err := t.conn.Close(); err != nil && logger.V(logLevel) { logger.Infof("transport: error closing conn during Close: %v", err) } - if channelz.IsOn() { - channelz.RemoveEntry(t.channelzID) - } + channelz.RemoveEntry(t.channelzID) // Cancel all active streams. for _, s := range streams { s.cancel() diff --git a/internal/transport/transport.go b/internal/transport/transport.go index d3bf65b2bdff..b0e1745d5e9d 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -529,7 +530,7 @@ type ServerConfig struct { InitialConnWindowSize int32 WriteBufferSize int ReadBufferSize int - ChannelzParentID int64 + ChannelzParentID *channelz.Identifier MaxHeaderListSize *uint32 HeaderTableSize *uint32 } @@ -563,7 +564,7 @@ type ConnectOptions struct { // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. ReadBufferSize int // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. - ChannelzParentID int64 + ChannelzParentID *channelz.Identifier // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. MaxHeaderListSize *uint32 // UseProxy specifies if a proxy should be used. diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 2c47cd54f07c..05a9d4e0bac0 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -19,7 +19,6 @@ package grpc import ( - "fmt" "strings" "sync" @@ -27,6 +26,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -97,10 +97,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { if ccr.done.HasFired() { return nil } - channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s) - if channelz.IsOn() { - ccr.addChannelzTraceEvent(s) - } + ccr.addChannelzTraceEvent(s) ccr.curState = s if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { return balancer.ErrBadResolverState @@ -125,10 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { if ccr.done.HasFired() { return } - channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs) - if channelz.IsOn() { - ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) - } + ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) ccr.curState.Addresses = addrs ccr.cc.updateResolverState(ccr.curState, nil) } @@ -141,7 +135,7 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { if ccr.done.HasFired() { return } - channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc) + channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc) if ccr.cc.dopts.disableServiceConfig { channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config") return @@ -151,9 +145,7 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) return } - if channelz.IsOn() { - ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) - } + ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) ccr.curState.ServiceConfig = scpr ccr.cc.updateResolverState(ccr.curState, nil) } @@ -180,8 +172,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { updates = append(updates, "resolver returned new addresses") } - channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{ - Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")), - Severity: channelz.CtInfo, - }) + channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) } diff --git a/server.go b/server.go index b24b6d53958d..db60cb3251f9 100644 --- a/server.go +++ b/server.go @@ -134,7 +134,7 @@ type Server struct { channelzRemoveOnce sync.Once serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop - channelzID int64 // channelz unique identification number + channelzID *channelz.Identifier czData *channelzData serverWorkerChannels []chan *serverWorkerData @@ -584,9 +584,7 @@ func NewServer(opt ...ServerOption) *Server { s.initServerWorkers() } - if channelz.IsOn() { - s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") - } + s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") return s } @@ -712,7 +710,7 @@ var ErrServerStopped = errors.New("grpc: the server has been stopped") type listenSocket struct { net.Listener - channelzID int64 + channelzID *channelz.Identifier } func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { @@ -724,9 +722,7 @@ func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { func (l *listenSocket) Close() error { err := l.Listener.Close() - if channelz.IsOn() { - channelz.RemoveEntry(l.channelzID) - } + channelz.RemoveEntry(l.channelzID) return err } @@ -737,44 +733,50 @@ func (l *listenSocket) Close() error { // this method returns. // Serve will return a non-nil error unless Stop or GracefulStop is called. func (s *Server) Serve(lis net.Listener) error { - s.mu.Lock() - s.printf("serving") - s.serve = true - if s.lis == nil { - // Serve called after Stop or GracefulStop. - s.mu.Unlock() - lis.Close() - return ErrServerStopped - } + if err := func() error { // Anonymous func to be able to defer the unlock. + s.mu.Lock() + defer s.mu.Unlock() - s.serveWG.Add(1) - defer func() { - s.serveWG.Done() - if s.quit.HasFired() { - // Stop or GracefulStop called; block until done and return nil. - <-s.done.Done() + s.printf("serving") + s.serve = true + if s.lis == nil { + lis.Close() + return ErrServerStopped } - }() - ls := &listenSocket{Listener: lis} - s.lis[ls] = true + s.serveWG.Add(1) + defer func() { + s.serveWG.Done() + if s.quit.HasFired() { + // Stop or GracefulStop called; block until done and return nil. + <-s.done.Done() + } + }() - if channelz.IsOn() { - ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) - } - s.mu.Unlock() + ls := &listenSocket{Listener: lis} + s.lis[ls] = true - defer func() { - s.mu.Lock() - if s.lis != nil && s.lis[ls] { - ls.Close() - delete(s.lis, ls) + var err error + ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) + if err != nil { + lis.Close() + return err } - s.mu.Unlock() - }() - var tempDelay time.Duration // how long to sleep on accept failure + defer func() { + s.mu.Lock() + if s.lis != nil && s.lis[ls] { + ls.Close() + delete(s.lis, ls) + } + s.mu.Unlock() + }() + return nil + }(); err != nil { + return err + } + var tempDelay time.Duration // how long to sleep on accept failure for { rawConn, err := lis.Accept() if err != nil { @@ -1709,11 +1711,7 @@ func (s *Server) Stop() { s.done.Fire() }() - s.channelzRemoveOnce.Do(func() { - if channelz.IsOn() { - channelz.RemoveEntry(s.channelzID) - } - }) + s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) s.mu.Lock() listeners := s.lis @@ -1751,11 +1749,7 @@ func (s *Server) GracefulStop() { s.quit.Fire() defer s.done.Fire() - s.channelzRemoveOnce.Do(func() { - if channelz.IsOn() { - channelz.RemoveEntry(s.channelzID) - } - }) + s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) s.mu.Lock() if s.conns == nil { s.mu.Unlock() diff --git a/test/channelz_test.go b/test/channelz_test.go index 3c953f1a5e80..a69b2839ead6 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -23,12 +23,13 @@ import ( "crypto/tls" "fmt" "net" - "reflect" + "regexp" "strings" "sync" "testing" "time" + "github.com/google/go-cmp/cmp" "golang.org/x/net/http2" "google.golang.org/grpc" _ "google.golang.org/grpc/balancer/grpclb" @@ -455,11 +456,11 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) { // Socket1 Socket2 czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) - topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "") - subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") - subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") - sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") - sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") + topChanID := channelz.RegisterChannel(&dummyChannel{}, nil, "") + subChanID1, _ := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") + subChanID2, _ := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") + sktID1, _ := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") + sktID2, _ := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") tcs, _ := channelz.GetTopChannels(0, 0) if tcs == nil || len(tcs) != 1 { @@ -468,7 +469,7 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) { if len(tcs[0].SubChans) != 2 { t.Fatalf("There should be two SubChannel entries") } - sc := channelz.GetSubChannel(subChanID1) + sc := channelz.GetSubChannel(subChanID1.Int()) if sc == nil || len(sc.Sockets) != 2 { t.Fatalf("There should be two Socket entries") } @@ -1380,7 +1381,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { if !ok { return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security) } - if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) { + if !cmp.Equal(securityVal.RemoteCertificate, cert.Certificate[0]) { return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0]) } for _, v := range cipherSuites { @@ -1397,6 +1398,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) + e := tcpClearRREnv // avoid calling API to set balancer type, which will void service config's change of balancer. e.balancer = "" @@ -1407,6 +1409,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { te.resolverScheme = r.Scheme() te.clientConn(grpc.WithResolvers(r)) defer te.tearDown() + var nestedConn int64 if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0, 0) @@ -1431,8 +1434,9 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { if len(ncm.Trace.Events) == 0 { return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") } - if ncm.Trace.Events[0].Desc != "Channel Created" { - return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc) + pattern := `Channel\(id:[0-9]+\) created` + if ok, _ := regexp.MatchString(pattern, ncm.Trace.Events[0].Desc); !ok { + return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, ncm.Trace.Events[0].Desc) } return true, nil }); err != nil { @@ -1460,8 +1464,9 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { if len(ncm.Trace.Events) == 0 { return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") } - if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" { - return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc) + pattern := `Channel\(id:[0-9]+\) created` + if ok, _ := regexp.MatchString(pattern, ncm.Trace.Events[0].Desc); !ok { + return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, ncm.Trace.Events[0].Desc) } return true, nil }); err != nil { @@ -1509,8 +1514,9 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { if len(scm.Trace.Events) == 0 { return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") } - if scm.Trace.Events[0].Desc != "Subchannel Created" { - return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc) + pattern := `Subchannel\(id:[0-9]+\) created` + if ok, _ := regexp.MatchString(pattern, scm.Trace.Events[0].Desc); !ok { + return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, scm.Trace.Events[0].Desc) } return true, nil }); err != nil { @@ -1551,10 +1557,12 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { if len(scm.Trace.Events) == 0 { return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") } - if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want { - return false, fmt.Errorf("the last trace event should be %q, not %q", want, got) - } + pattern := `Subchannel\(id:[0-9]+\) deleted` + desc := scm.Trace.Events[len(scm.Trace.Events)-1].Desc + if ok, _ := regexp.MatchString(pattern, desc); !ok { + return false, fmt.Errorf("the last trace event should be %q, not %q", pattern, desc) + } return true, nil }); err != nil { t.Fatal(err) @@ -1600,7 +1608,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { cm := channelz.GetChannel(cid) for i := len(cm.Trace.Events) - 1; i >= 0; i-- { - if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) { + if strings.Contains(cm.Trace.Events[i].Desc, fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name)) { break } if i == 0 { @@ -1725,7 +1733,7 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") } for i := len(scm.Trace.Events) - 1; i >= 0; i-- { - if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) { + if strings.Contains(scm.Trace.Events[i].Desc, fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2])) { break } if i == 0 { @@ -1756,9 +1764,9 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } - var subConn int64 te.srv.Stop() + var subConn int64 if err := verifyResultWithDelay(func() (bool, error) { // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due // to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}})) @@ -1773,6 +1781,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { for k := range tcs[0].SubChans { // get the SubChannel id for further trace inquiry. subConn = k + t.Logf("SubChannel Id is %d", subConn) } } scm := channelz.GetSubChannel(subConn) @@ -1786,8 +1795,10 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") } var ready, connecting, transient, shutdown int + t.Log("SubChannel trace events seen so far...") for _, e := range scm.Trace.Events { - if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { + t.Log(e.Desc) + if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) { transient++ } } @@ -1798,17 +1809,19 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { } transient = 0 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}}) + t.Log("SubChannel trace events seen so far...") for _, e := range scm.Trace.Events { - if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) { + t.Log(e.Desc) + if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)) { ready++ } - if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) { + if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting)) { connecting++ } - if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { + if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) { transient++ } - if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) { + if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown)) { shutdown++ } } @@ -1851,6 +1864,7 @@ func (s) TestCZChannelConnectivityState(t *testing.T) { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } te.srv.Stop() + if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0, 0) if len(tcs) != 1 { @@ -1858,14 +1872,16 @@ func (s) TestCZChannelConnectivityState(t *testing.T) { } var ready, connecting, transient int + t.Log("Channel trace events seen so far...") for _, e := range tcs[0].Trace.Events { - if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) { + t.Log(e.Desc) + if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready)) { ready++ } - if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) { + if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting)) { connecting++ } - if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) { + if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure)) { transient++ } } diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 5b3a2403e1a7..771152b7bb97 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/balancergroup" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/hierarchy" "google.golang.org/grpc/internal/testutils" @@ -516,7 +517,6 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { const ( balancerName = "stubBalancer-TestClusterManagerForwardsBalancerBuildOptions" - parent = int64(1234) userAgent = "ua" defaultTestTimeout = 1 * time.Second ) @@ -526,7 +526,7 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { ccsCh := testutils.NewChannel() bOpts := balancer.BuildOptions{ DialCreds: insecure.NewCredentials(), - ChannelzParentID: parent, + ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, 1234, nil), CustomUserAgent: userAgent, } stub.Register(balancerName, stub.BalancerFuncs{