diff --git a/balancer/balancer.go b/balancer/balancer.go index f7a7697cad0..25713908072 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -371,56 +371,3 @@ type ClientConnState struct { // ErrBadResolverState may be returned by UpdateClientConnState to indicate a // problem with the provided name resolver data. var ErrBadResolverState = errors.New("bad resolver state") - -// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns -// and returns one aggregated connectivity state. -// -// It's not thread safe. -type ConnectivityStateEvaluator struct { - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. - numTransientFailure uint64 // Number of addrConns in transient failure state. - numIdle uint64 // Number of addrConns in idle state. -} - -// RecordTransition records state change happening in subConn and based on that -// it evaluates what aggregated state should be. -// -// - If at least one SubConn in Ready, the aggregated state is Ready; -// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; -// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure; -// - Else if at least one SubConn is Idle, the aggregated state is Idle; -// - Else there are no subconns and the aggregated state is Transient Failure -// -// Shutdown is not considered. -func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State { - // Update counters. - for idx, state := range []connectivity.State{oldState, newState} { - updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. - switch state { - case connectivity.Ready: - cse.numReady += updateVal - case connectivity.Connecting: - cse.numConnecting += updateVal - case connectivity.TransientFailure: - cse.numTransientFailure += updateVal - case connectivity.Idle: - cse.numIdle += updateVal - } - } - - // Evaluate. - if cse.numReady > 0 { - return connectivity.Ready - } - if cse.numConnecting > 0 { - return connectivity.Connecting - } - if cse.numTransientFailure > 0 { - return connectivity.TransientFailure - } - if cse.numIdle > 0 { - return connectivity.Idle - } - return connectivity.TransientFailure -} diff --git a/balancer/conn_state_evaluator.go b/balancer/conn_state_evaluator.go new file mode 100644 index 00000000000..a87b6809af3 --- /dev/null +++ b/balancer/conn_state_evaluator.go @@ -0,0 +1,70 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancer + +import "google.golang.org/grpc/connectivity" + +// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns +// and returns one aggregated connectivity state. +// +// It's not thread safe. +type ConnectivityStateEvaluator struct { + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transient failure state. + numIdle uint64 // Number of addrConns in idle state. +} + +// RecordTransition records state change happening in subConn and based on that +// it evaluates what aggregated state should be. +// +// - If at least one SubConn in Ready, the aggregated state is Ready; +// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; +// - Else if at least one SubConn is Idle, the aggregated state is Idle; +// - Else if at least one SubConn is TransientFailure (or there are no SubConns), the aggregated state is Transient Failure. +// +// Shutdown is not considered. +func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State { + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + switch state { + case connectivity.Ready: + cse.numReady += updateVal + case connectivity.Connecting: + cse.numConnecting += updateVal + case connectivity.TransientFailure: + cse.numTransientFailure += updateVal + case connectivity.Idle: + cse.numIdle += updateVal + } + } + + // Evaluate. + if cse.numReady > 0 { + return connectivity.Ready + } + if cse.numConnecting > 0 { + return connectivity.Connecting + } + if cse.numIdle > 0 { + return connectivity.Idle + } + return connectivity.TransientFailure +} diff --git a/balancer/conn_state_evaluator_test.go b/balancer/conn_state_evaluator_test.go new file mode 100644 index 00000000000..d82ddf84c24 --- /dev/null +++ b/balancer/conn_state_evaluator_test.go @@ -0,0 +1,245 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancer + +import ( + "testing" + + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpctest" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestRecordTransition_FirstStateChange tests the first call to +// RecordTransition where the `oldState` is usually set to `Shutdown` (a state +// that the ConnectivityStateEvaluator is set to ignore). +func (s) TestRecordTransition_FirstStateChange(t *testing.T) { + tests := []struct { + newState connectivity.State + wantState connectivity.State + }{ + { + newState: connectivity.Idle, + wantState: connectivity.Idle, + }, + { + newState: connectivity.Connecting, + wantState: connectivity.Connecting, + }, + { + newState: connectivity.Ready, + wantState: connectivity.Ready, + }, + { + newState: connectivity.TransientFailure, + wantState: connectivity.TransientFailure, + }, + { + newState: connectivity.Shutdown, + wantState: connectivity.TransientFailure, + }, + } + for _, test := range tests { + cse := &ConnectivityStateEvaluator{} + if gotState := cse.RecordTransition(connectivity.Shutdown, test.newState); gotState != test.wantState { + t.Fatalf("RecordTransition(%v, %v) = %v, want %v", connectivity.Shutdown, test.newState, gotState, test.wantState) + } + } +} + +// TestRecordTransition_SameState tests the scenario where state transitions to +// the same state are recorded multiple times. +func (s) TestRecordTransition_SameState(t *testing.T) { + tests := []struct { + newState connectivity.State + wantState connectivity.State + }{ + { + newState: connectivity.Idle, + wantState: connectivity.Idle, + }, + { + newState: connectivity.Connecting, + wantState: connectivity.Connecting, + }, + { + newState: connectivity.Ready, + wantState: connectivity.Ready, + }, + { + newState: connectivity.TransientFailure, + wantState: connectivity.TransientFailure, + }, + { + newState: connectivity.Shutdown, + wantState: connectivity.TransientFailure, + }, + } + const numStateChanges = 5 + for _, test := range tests { + cse := &ConnectivityStateEvaluator{} + var prevState, gotState connectivity.State + prevState = connectivity.Shutdown + for i := 0; i < numStateChanges; i++ { + gotState = cse.RecordTransition(prevState, test.newState) + prevState = test.newState + } + if gotState != test.wantState { + t.Fatalf("RecordTransition() = %v, want %v", gotState, test.wantState) + } + } +} + +// TestRecordTransition_SingleSubConn_DifferentStates tests some common +// connectivity state change scenarios, on a single subConn. +func (s) TestRecordTransition_SingleSubConn_DifferentStates(t *testing.T) { + tests := []struct { + name string + states []connectivity.State + wantState connectivity.State + }{ + { + name: "regular transition to ready", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready}, + wantState: connectivity.Ready, + }, + { + name: "regular transition to transient failure", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure}, + wantState: connectivity.TransientFailure, + }, + { + name: "regular transition to ready", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.Idle}, + wantState: connectivity.Idle, + }, + { + name: "transition from ready to transient failure", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure}, + wantState: connectivity.TransientFailure, + }, + { + name: "transition from transient failure back to ready", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure, connectivity.Ready}, + wantState: connectivity.Ready, + }, + { + // This state transition is usually suppressed at the LB policy level, by + // not calling RecordTransition. + name: "transition from transient failure back to idle", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure, connectivity.Idle}, + wantState: connectivity.Idle, + }, + { + // This state transition is usually suppressed at the LB policy level, by + // not calling RecordTransition. + name: "transition from transient failure back to connecting", + states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting}, + wantState: connectivity.Connecting, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cse := &ConnectivityStateEvaluator{} + var prevState, gotState connectivity.State + prevState = connectivity.Shutdown + for _, newState := range test.states { + gotState = cse.RecordTransition(prevState, newState) + prevState = newState + } + if gotState != test.wantState { + t.Fatalf("RecordTransition() = %v, want %v", gotState, test.wantState) + } + }) + } +} + +// TestRecordTransition_MultipleSubConns_DifferentStates tests state transitions +// among multiple subConns, and verifies that the connectivity state aggregation +// algorithm produces the expected aggregate connectivity state. +func (s) TestRecordTransition_MultipleSubConns_DifferentStates(t *testing.T) { + tests := []struct { + name string + // Each entry in this slice corresponds to the state changes happening on an + // individual subConn. + subConnStates [][]connectivity.State + wantState connectivity.State + }{ + { + name: "atleast one ready", + subConnStates: [][]connectivity.State{ + {connectivity.Idle, connectivity.Connecting, connectivity.Ready}, + {connectivity.Idle}, + {connectivity.Idle, connectivity.Connecting}, + {connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure}, + }, + wantState: connectivity.Ready, + }, + { + name: "atleast one connecting", + subConnStates: [][]connectivity.State{ + {connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.Connecting}, + {connectivity.Idle}, + {connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure}, + }, + wantState: connectivity.Connecting, + }, + { + name: "atleast one idle", + subConnStates: [][]connectivity.State{ + {connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.Idle}, + {connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure}, + }, + wantState: connectivity.Idle, + }, + { + name: "atleast one transient failure", + subConnStates: [][]connectivity.State{ + {connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure}, + {connectivity.TransientFailure}, + }, + wantState: connectivity.TransientFailure, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cse := &ConnectivityStateEvaluator{} + var prevState, gotState connectivity.State + for _, scStates := range test.subConnStates { + prevState = connectivity.Shutdown + for _, newState := range scStates { + gotState = cse.RecordTransition(prevState, newState) + prevState = newState + } + } + if gotState != test.wantState { + t.Fatalf("RecordTransition() = %v, want %v", gotState, test.wantState) + } + }) + } +}