Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport PRs to v1.41.x #4810

Merged
merged 2 commits into from Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions internal/transport/http2_server.go
Expand Up @@ -139,9 +139,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
var err error
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err == credentials.ErrConnDispatched {
// ErrConnDispatched means that the connection was dispatched away
// from gRPC; those connections should be left open. io.EOF means
// the connection was closed before handshaking completed, which can
// happen naturally from probers. Return these errors directly.
if err == credentials.ErrConnDispatched || err == io.EOF {
return nil, err
}
return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
Expand Down
7 changes: 6 additions & 1 deletion server.go
Expand Up @@ -887,7 +887,12 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
if err != credentials.ErrConnDispatched {
c.Close()
}
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
if err != credentials.ErrConnDispatched {
if err != io.EOF {
channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
}
}
return nil
}

Expand Down
Expand Up @@ -183,13 +183,18 @@ func (bsa *balancerStateAggregator) build() balancer.State {
// handling the special connecting after ready, as in UpdateState(). Then a
// function to calculate the aggregated connectivity state as in this
// function.
var readyN, connectingN int
//
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
for _, ps := range bsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
Expand All @@ -198,6 +203,8 @@ func (bsa *balancerStateAggregator) build() balancer.State {
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}
Expand Down
65 changes: 65 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Expand Up @@ -565,3 +565,68 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
t.Fatal(err2)
}
}

const initIdleBalancerName = "test-init-Idle-balancer"

var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")

func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}

// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})

configJSON1 := `{
"children": {
"cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] }
}
}`

config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
}

if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}
Expand Up @@ -200,7 +200,9 @@ func (wbsa *Aggregator) BuildAndUpdate() {
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
m := wbsa.idToPickerState
var readyN, connectingN int
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
readyPickerWithWeights := make([]weightedPickerState, 0, len(m))
for _, ps := range m {
switch ps.stateToAggregate {
Expand All @@ -209,6 +211,8 @@ func (wbsa *Aggregator) build() balancer.State {
readyPickerWithWeights = append(readyPickerWithWeights, *ps)
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
Expand All @@ -217,6 +221,8 @@ func (wbsa *Aggregator) build() balancer.State {
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}
Expand Down
61 changes: 61 additions & 0 deletions xds/internal/balancer/weightedtarget/weightedtarget_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -263,3 +264,63 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return scst.SubConn
}
}

const initIdleBalancerName = "test-init-Idle-balancer"

var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")

func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}

// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})

// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
}

if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}