diff --git a/channelz/service/service_sktopt_test.go b/channelz/service/service_sktopt_test.go index 4ea6b20cd6a..efd383fce3c 100644 --- a/channelz/service/service_sktopt_test.go +++ b/channelz/service/service_sktopt_test.go @@ -126,7 +126,7 @@ func protoToSocketOption(skopts []*channelzpb.SocketOption) *channelz.SocketOpti } func (s) TestGetSocketOptions(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) ss := []*dummySocket{ { diff --git a/channelz/service/service_test.go b/channelz/service/service_test.go index 03d2b29c27b..17409533745 100644 --- a/channelz/service/service_test.go +++ b/channelz/service/service_test.go @@ -322,7 +322,7 @@ func (s) TestGetTopChannels(t *testing.T) { }, {}, } - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) for _, c := range tcs { id := channelz.RegisterChannel(c, 0, "") @@ -371,7 +371,7 @@ func (s) TestGetServers(t *testing.T) { lastCallStartedTimestamp: time.Now().UTC(), }, } - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) for _, s := range ss { id := channelz.RegisterServer(s, "") @@ -400,7 +400,7 @@ func (s) TestGetServers(t *testing.T) { } func (s) TestGetServerSockets(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) svrID := channelz.RegisterServer(&dummyServer{}, "") defer channelz.RemoveEntry(svrID) @@ -441,7 +441,7 @@ func (s) TestGetServerSockets(t *testing.T) { // This test makes a GetServerSockets with a non-zero start ID, and expect only // sockets with ID >= the given start ID. func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) svrID := channelz.RegisterServer(&dummyServer{}, "") defer channelz.RemoveEntry(svrID) @@ -473,7 +473,7 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { } func (s) TestGetChannel(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"} ids := make([]int64, 4) @@ -578,7 +578,7 @@ func (s) TestGetSubChannel(t *testing.T) { subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0") ) - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"} ids := make([]int64, 4) @@ -652,7 +652,7 @@ func (s) TestGetSubChannel(t *testing.T) { } func (s) TestGetSocket(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer cleanupWrapper(czCleanup, t) ss := []*dummySocket{ { diff --git a/internal/channelz/funcs.go b/internal/channelz/funcs.go index cd1807543ee..ea660a147cf 100644 --- a/internal/channelz/funcs.go +++ b/internal/channelz/funcs.go @@ -24,6 +24,7 @@ package channelz import ( + "context" "fmt" "sort" "sync" @@ -49,7 +50,8 @@ var ( // TurnOn turns on channelz data collection. func TurnOn() { if !IsOn() { - NewChannelzStorage() + db.set(newChannelMap()) + idGen.reset() atomic.StoreInt32(&curState, 1) } } @@ -94,46 +96,40 @@ func (d *dbWrapper) get() *channelMap { return d.DB } -// NewChannelzStorage initializes channelz data storage and id generator. +// NewChannelzStorageForTesting initializes channelz data storage and id +// generator for testing purposes. // -// This function returns a cleanup function to wait for all channelz state to be reset by the -// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests -// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen -// to remove some entity just register by the new test, since the id space is the same. -// -// Note: This function is exported for testing purpose only. User should not call -// it in most cases. -func NewChannelzStorage() (cleanup func() error) { - db.set(&channelMap{ - topLevelChannels: make(map[int64]struct{}), - channels: make(map[int64]*channel), - listenSockets: make(map[int64]*listenSocket), - normalSockets: make(map[int64]*normalSocket), - servers: make(map[int64]*server), - subChannels: make(map[int64]*subChannel), - }) +// Returns a cleanup function to be invoked by the test, which waits for up to +// 10s for all channelz state to be reset by the grpc goroutines when those +// entities get closed. This cleanup function helps with ensuring that tests +// don't mess up each other. +func NewChannelzStorageForTesting() (cleanup func() error) { + db.set(newChannelMap()) idGen.reset() + return func() error { - var err error cm := db.get() if cm == nil { return nil } - for i := 0; i < 1000; i++ { - cm.mu.Lock() - if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 { - cm.mu.Unlock() - // all things stored in the channelz map have been cleared. + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + cm.mu.RLock() + topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets) + cm.mu.RUnlock() + + if err := ctx.Err(); err != nil { + return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets) + } + if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 { return nil } - cm.mu.Unlock() - time.Sleep(10 * time.Millisecond) + <-ticker.C } - - cm.mu.Lock() - err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)) - cm.mu.Unlock() - return err } } @@ -326,6 +322,17 @@ type channelMap struct { normalSockets map[int64]*normalSocket } +func newChannelMap() *channelMap { + return &channelMap{ + topLevelChannels: make(map[int64]struct{}), + channels: make(map[int64]*channel), + listenSockets: make(map[int64]*listenSocket), + normalSockets: make(map[int64]*normalSocket), + servers: make(map[int64]*server), + subChannels: make(map[int64]*subChannel), + } +} + func (c *channelMap) addServer(id int64, s *server) { c.mu.Lock() s.cm = c diff --git a/test/channelz_linux_test.go b/test/channelz_linux_test.go index aa6febe537a..0eef08df3c9 100644 --- a/test/channelz_linux_test.go +++ b/test/channelz_linux_test.go @@ -38,7 +38,7 @@ func (s) TestCZSocketMetricsSocketOption(t *testing.T) { } func testCZSocketMetricsSocketOption(t *testing.T, e env) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) te := newTest(t, e) te.startServer(&testServer{security: e.security}) diff --git a/test/channelz_test.go b/test/channelz_test.go index 6cb09dd8d89..3c953f1a5e8 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -82,7 +82,7 @@ func (s) TestCZServerRegistrationAndDeletion(t *testing.T) { } for _, c := range testcases { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -101,7 +101,7 @@ func (s) TestCZServerRegistrationAndDeletion(t *testing.T) { } func (s) TestCZGetServer(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -153,7 +153,7 @@ func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) { } for _, c := range testcases { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -191,7 +191,7 @@ func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) { } func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) // Make dial fails (due to no transport security specified) _, err := grpc.Dial("fake.addr") @@ -204,7 +204,7 @@ func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) { } func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv // avoid calling API to set balancer type, which will void service config's change of balancer. @@ -248,7 +248,7 @@ func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { } func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv num := 3 // number of backends @@ -336,7 +336,7 @@ func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) { } for _, c := range testcases { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -396,7 +396,7 @@ func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) { } func (s) TestCZServerListenSocketDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) s := grpc.NewServer() lis, err := net.Listen("tcp", "localhost:0") @@ -453,7 +453,7 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) { // | | // v v // Socket1 Socket2 - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "") subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") @@ -498,7 +498,7 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) { } func (s) TestCZChannelMetrics(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv num := 3 // number of backends @@ -586,7 +586,7 @@ func (s) TestCZChannelMetrics(t *testing.T) { } func (s) TestCZServerMetrics(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -862,7 +862,7 @@ func doIdleCallToInvokeKeepAlive(tc testpb.TestServiceClient, t *testing.T) { } func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -962,7 +962,7 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { // It is separated from other cases due to setup incompatibly, i.e. max receive // size violation will mask flow control violation. func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1046,7 +1046,7 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t } func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1159,7 +1159,7 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { } func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime) internal.KeepaliveMinPingTime = time.Second @@ -1212,7 +1212,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) { } func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1273,7 +1273,7 @@ func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { } func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1345,7 +1345,7 @@ var cipherSuites = []string{ } func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpTLSRREnv te := newTest(t, e) @@ -1395,7 +1395,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) { } func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv // avoid calling API to set balancer type, which will void service config's change of balancer. @@ -1470,7 +1470,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) { } func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1562,7 +1562,7 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) { } func (s) TestCZChannelAddressResolutionChange(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv e.balancer = "" @@ -1665,7 +1665,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) { } func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv e.balancer = "" @@ -1739,7 +1739,7 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { } func (s) TestCZSubChannelConnectivityState(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1833,7 +1833,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) { } func (s) TestCZChannelConnectivityState(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -1889,7 +1889,7 @@ func (s) TestCZChannelConnectivityState(t *testing.T) { } func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv // avoid newTest using WithBalancerName, which would override service @@ -1954,7 +1954,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) { } func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e) @@ -2014,7 +2014,7 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { } func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) { - czCleanup := channelz.NewChannelzStorage() + czCleanup := channelz.NewChannelzStorageForTesting() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv te := newTest(t, e)