Skip to content

Commit

Permalink
channelz: rename NewChannelzStorage to NewChannelzStorageForTesting (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Feb 9, 2022
1 parent 0e05549 commit a354b1e
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 67 deletions.
2 changes: 1 addition & 1 deletion channelz/service/service_sktopt_test.go
Expand Up @@ -126,7 +126,7 @@ func protoToSocketOption(skopts []*channelzpb.SocketOption) *channelz.SocketOpti
}

func (s) TestGetSocketOptions(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
Expand Down
14 changes: 7 additions & 7 deletions channelz/service/service_test.go
Expand Up @@ -322,7 +322,7 @@ func (s) TestGetTopChannels(t *testing.T) {
},
{},
}
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
for _, c := range tcs {
id := channelz.RegisterChannel(c, 0, "")
Expand Down Expand Up @@ -371,7 +371,7 @@ func (s) TestGetServers(t *testing.T) {
lastCallStartedTimestamp: time.Now().UTC(),
},
}
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
for _, s := range ss {
id := channelz.RegisterServer(s, "")
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s) TestGetServers(t *testing.T) {
}

func (s) TestGetServerSockets(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
Expand Down Expand Up @@ -441,7 +441,7 @@ func (s) TestGetServerSockets(t *testing.T) {
// This test makes a GetServerSockets with a non-zero start ID, and expect only
// sockets with ID >= the given start ID.
func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
Expand Down Expand Up @@ -473,7 +473,7 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
}

func (s) TestGetChannel(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]int64, 4)
Expand Down Expand Up @@ -578,7 +578,7 @@ func (s) TestGetSubChannel(t *testing.T) {
subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
)
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]int64, 4)
Expand Down Expand Up @@ -652,7 +652,7 @@ func (s) TestGetSubChannel(t *testing.T) {
}

func (s) TestGetSocket(t *testing.T) {
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
Expand Down
69 changes: 38 additions & 31 deletions internal/channelz/funcs.go
Expand Up @@ -24,6 +24,7 @@
package channelz

import (
"context"
"fmt"
"sort"
"sync"
Expand All @@ -49,7 +50,8 @@ var (
// TurnOn turns on channelz data collection.
func TurnOn() {
if !IsOn() {
NewChannelzStorage()
db.set(newChannelMap())
idGen.reset()
atomic.StoreInt32(&curState, 1)
}
}
Expand Down Expand Up @@ -94,46 +96,40 @@ func (d *dbWrapper) get() *channelMap {
return d.DB
}

// NewChannelzStorage initializes channelz data storage and id generator.
// NewChannelzStorageForTesting initializes channelz data storage and id
// generator for testing purposes.
//
// This function returns a cleanup function to wait for all channelz state to be reset by the
// grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests
// don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen
// to remove some entity just register by the new test, since the id space is the same.
//
// Note: This function is exported for testing purpose only. User should not call
// it in most cases.
func NewChannelzStorage() (cleanup func() error) {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
listenSockets: make(map[int64]*listenSocket),
normalSockets: make(map[int64]*normalSocket),
servers: make(map[int64]*server),
subChannels: make(map[int64]*subChannel),
})
// Returns a cleanup function to be invoked by the test, which waits for up to
// 10s for all channelz state to be reset by the grpc goroutines when those
// entities get closed. This cleanup function helps with ensuring that tests
// don't mess up each other.
func NewChannelzStorageForTesting() (cleanup func() error) {
db.set(newChannelMap())
idGen.reset()

return func() error {
var err error
cm := db.get()
if cm == nil {
return nil
}
for i := 0; i < 1000; i++ {
cm.mu.Lock()
if len(cm.topLevelChannels) == 0 && len(cm.servers) == 0 && len(cm.channels) == 0 && len(cm.subChannels) == 0 && len(cm.listenSockets) == 0 && len(cm.normalSockets) == 0 {
cm.mu.Unlock()
// all things stored in the channelz map have been cleared.

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
cm.mu.RLock()
topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
cm.mu.RUnlock()

if err := ctx.Err(); err != nil {
return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
}
if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
return nil
}
cm.mu.Unlock()
time.Sleep(10 * time.Millisecond)
<-ticker.C
}

cm.mu.Lock()
err = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets))
cm.mu.Unlock()
return err
}
}

Expand Down Expand Up @@ -326,6 +322,17 @@ type channelMap struct {
normalSockets map[int64]*normalSocket
}

func newChannelMap() *channelMap {
return &channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
listenSockets: make(map[int64]*listenSocket),
normalSockets: make(map[int64]*normalSocket),
servers: make(map[int64]*server),
subChannels: make(map[int64]*subChannel),
}
}

func (c *channelMap) addServer(id int64, s *server) {
c.mu.Lock()
s.cm = c
Expand Down
2 changes: 1 addition & 1 deletion test/channelz_linux_test.go
Expand Up @@ -38,7 +38,7 @@ func (s) TestCZSocketMetricsSocketOption(t *testing.T) {
}

func testCZSocketMetricsSocketOption(t *testing.T, e env) {
czCleanup := channelz.NewChannelzStorage()
czCleanup := channelz.NewChannelzStorageForTesting()
defer czCleanupWrapper(czCleanup, t)
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
Expand Down

0 comments on commit a354b1e

Please sign in to comment.