diff --git a/attributes/attributes.go b/attributes/attributes.go index 3220d87be40..6ff2792ee4f 100644 --- a/attributes/attributes.go +++ b/attributes/attributes.go @@ -25,55 +25,75 @@ // later release. package attributes -import "fmt" - // Attributes is an immutable struct for storing and retrieving generic // key/value pairs. Keys must be hashable, and users should define their own -// types for keys. +// types for keys. Values should not be modified after they are added to an +// Attributes or if they were received from one. If values implement 'Equal(o +// interface{}) bool', it will be called by (*Attributes).Equal to determine +// whether two values with the same key should be considered equal. type Attributes struct { m map[interface{}]interface{} } -// New returns a new Attributes containing all key/value pairs in kvs. If the -// same key appears multiple times, the last value overwrites all previous -// values for that key. Panics if len(kvs) is not even. -func New(kvs ...interface{}) *Attributes { - if len(kvs)%2 != 0 { - panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs))) - } - a := &Attributes{m: make(map[interface{}]interface{}, len(kvs)/2)} - for i := 0; i < len(kvs)/2; i++ { - a.m[kvs[i*2]] = kvs[i*2+1] - } - return a +// New returns a new Attributes containing the key/value pair. +func New(key, value interface{}) *Attributes { + return &Attributes{m: map[interface{}]interface{}{key: value}} } -// WithValues returns a new Attributes containing all key/value pairs in a and -// kvs. Panics if len(kvs) is not even. If the same key appears multiple -// times, the last value overwrites all previous values for that key. To -// remove an existing key, use a nil value. -func (a *Attributes) WithValues(kvs ...interface{}) *Attributes { +// WithValue returns a new Attributes containing the previous keys and values +// and the new key/value pair. If the same key appears multiple times, the +// last value overwrites all previous values for that key. To remove an +// existing key, use a nil value. value should not be modified later. +func (a *Attributes) WithValue(key, value interface{}) *Attributes { if a == nil { - return New(kvs...) + return New(key, value) } - if len(kvs)%2 != 0 { - panic(fmt.Sprintf("attributes.New called with unexpected input: len(kvs) = %v", len(kvs))) - } - n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+len(kvs)/2)} + n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+1)} for k, v := range a.m { n.m[k] = v } - for i := 0; i < len(kvs)/2; i++ { - n.m[kvs[i*2]] = kvs[i*2+1] - } + n.m[key] = value return n } // Value returns the value associated with these attributes for key, or nil if -// no value is associated with key. +// no value is associated with key. The returned value should not be modified. func (a *Attributes) Value(key interface{}) interface{} { if a == nil { return nil } return a.m[key] } + +// Equal returns whether a and o are equivalent. If 'Equal(o interface{}) +// bool' is implemented for a value in the attributes, it is called to +// determine if the value matches the one stored in the other attributes. If +// Equal is not implemented, standard equality is used to determine if the two +// values are equal. +func (a *Attributes) Equal(o *Attributes) bool { + if a == nil && o == nil { + return true + } + if a == nil || o == nil { + return false + } + if len(a.m) != len(o.m) { + return false + } + for k, v := range a.m { + ov, ok := o.m[k] + if !ok { + // o missing element of a + return false + } + if eq, ok := v.(interface{ Equal(o interface{}) bool }); ok { + if !eq.Equal(ov) { + return false + } + } else if v != ov { + // Fallback to a standard equality check if Value is unimplemented. + return false + } + } + return true +} diff --git a/attributes/attributes_test.go b/attributes/attributes_test.go index 1174e2371a5..02d5b24f3df 100644 --- a/attributes/attributes_test.go +++ b/attributes/attributes_test.go @@ -20,41 +20,71 @@ package attributes_test import ( "fmt" - "reflect" "testing" "google.golang.org/grpc/attributes" ) +type stringVal struct { + s string +} + +func (s stringVal) Equal(o interface{}) bool { + os, ok := o.(stringVal) + return ok && s.s == os.s +} + func ExampleAttributes() { type keyOne struct{} type keyTwo struct{} - a := attributes.New(keyOne{}, 1, keyTwo{}, "two") + a := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"}) fmt.Println("Key one:", a.Value(keyOne{})) fmt.Println("Key two:", a.Value(keyTwo{})) // Output: // Key one: 1 - // Key two: two + // Key two: {two} } -func ExampleAttributes_WithValues() { +func ExampleAttributes_WithValue() { type keyOne struct{} type keyTwo struct{} a := attributes.New(keyOne{}, 1) - a = a.WithValues(keyTwo{}, "two") + a = a.WithValue(keyTwo{}, stringVal{s: "two"}) fmt.Println("Key one:", a.Value(keyOne{})) fmt.Println("Key two:", a.Value(keyTwo{})) // Output: // Key one: 1 - // Key two: two + // Key two: {two} } -// Test that two attributes with the same content are `reflect.DeepEqual`. -func TestDeepEqual(t *testing.T) { +// Test that two attributes with the same content are Equal. +func TestEqual(t *testing.T) { type keyOne struct{} - a1 := attributes.New(keyOne{}, 1) - a2 := attributes.New(keyOne{}, 1) - if !reflect.DeepEqual(a1, a2) { - t.Fatalf("reflect.DeepEqual(%+v, %+v), want true, got false", a1, a2) + type keyTwo struct{} + a1 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"}) + a2 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"}) + if !a1.Equal(a2) { + t.Fatalf("%+v.Equals(%+v) = false; want true", a1, a2) + } + if !a2.Equal(a1) { + t.Fatalf("%+v.Equals(%+v) = false; want true", a2, a1) + } +} + +// Test that two attributes with different content are not Equal. +func TestNotEqual(t *testing.T) { + type keyOne struct{} + type keyTwo struct{} + a1 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "two"}) + a2 := attributes.New(keyOne{}, 2).WithValue(keyTwo{}, stringVal{s: "two"}) + a3 := attributes.New(keyOne{}, 1).WithValue(keyTwo{}, stringVal{s: "one"}) + if a1.Equal(a2) { + t.Fatalf("%+v.Equals(%+v) = true; want false", a1, a2) + } + if a2.Equal(a1) { + t.Fatalf("%+v.Equals(%+v) = true; want false", a2, a1) + } + if a3.Equal(a1) { + t.Fatalf("%+v.Equals(%+v) = true; want false", a3, a1) } } diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index 8dd504299fe..908c6e3376e 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" - "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -42,7 +41,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) cc: cc, pickerBuilder: bb.pickerBuilder, - subConns: make(map[resolver.Address]subConnInfo), + subConns: resolver.NewAddressMap(), scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, config: bb.config, @@ -58,11 +57,6 @@ func (bb *baseBuilder) Name() string { return bb.name } -type subConnInfo struct { - subConn balancer.SubConn - attrs *attributes.Attributes -} - type baseBalancer struct { cc balancer.ClientConn pickerBuilder PickerBuilder @@ -70,7 +64,7 @@ type baseBalancer struct { csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State - subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses) + subConns *resolver.AddressMap scStates map[balancer.SubConn]connectivity.State picker balancer.Picker config Config @@ -81,7 +75,7 @@ type baseBalancer struct { func (b *baseBalancer) ResolverError(err error) { b.resolverErr = err - if len(b.subConns) == 0 { + if b.subConns.Len() == 0 { b.state = connectivity.TransientFailure } @@ -105,57 +99,32 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // Successful resolution; clear resolver error and ensure we return nil. b.resolverErr = nil // addrsSet is the set converted from addrs, it's used for quick lookup of an address. - addrsSet := make(map[resolver.Address]struct{}) + addrsSet := resolver.NewAddressMap() for _, a := range s.ResolverState.Addresses { - // Strip attributes from addresses before using them as map keys. So - // that when two addresses only differ in attributes pointers (but with - // the same attribute content), they are considered the same address. - // - // Note that this doesn't handle the case where the attribute content is - // different. So if users want to set different attributes to create - // duplicate connections to the same backend, it doesn't work. This is - // fine for now, because duplicate is done by setting Metadata today. - // - // TODO: read attributes to handle duplicate connections. - aNoAttrs := a - aNoAttrs.Attributes = nil - addrsSet[aNoAttrs] = struct{}{} - if scInfo, ok := b.subConns[aNoAttrs]; !ok { + addrsSet.Set(a, nil) + if _, ok := b.subConns.Get(a); !ok { // a is a new address (not existing in b.subConns). - // - // When creating SubConn, the original address with attributes is - // passed through. So that connection configurations in attributes - // (like creds) will be used. sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) if err != nil { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue } - b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes} + b.subConns.Set(a, sc) b.scStates[sc] = connectivity.Idle b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle) sc.Connect() - } else { - // Always update the subconn's address in case the attributes - // changed. - // - // The SubConn does a reflect.DeepEqual of the new and old - // addresses. So this is a noop if the current address is the same - // as the old one (including attributes). - scInfo.attrs = a.Attributes - b.subConns[aNoAttrs] = scInfo - b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a}) } } - for a, scInfo := range b.subConns { + b.subConns.Range(func(a resolver.Address, sci interface{}) { + sc := sci.(balancer.SubConn) // a was removed by resolver. - if _, ok := addrsSet[a]; !ok { - b.cc.RemoveSubConn(scInfo.subConn) - delete(b.subConns, a) + if _, ok := addrsSet.Get(a); !ok { + b.cc.RemoveSubConn(sc) + b.subConns.Delete(a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in UpdateSubConnState. } - } + }) // If resolver state contains no addresses, return an error so ClientConn // will trigger re-resolve. Also records this as an resolver error, so when // the overall state turns transient failure, the error message will have @@ -193,12 +162,12 @@ func (b *baseBalancer) regeneratePicker() { readySCs := make(map[balancer.SubConn]SubConnInfo) // Filter out all ready SCs from full subConn map. - for addr, scInfo := range b.subConns { - if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready { - addr.Attributes = scInfo.attrs - readySCs[scInfo.subConn] = SubConnInfo{Address: addr} + b.subConns.Range(func(addr resolver.Address, sci interface{}) { + sc := sci.(balancer.SubConn) + if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { + readySCs[sc] = SubConnInfo{Address: addr} } - } + }) b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) } diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index f8ff8cf9844..3a3ccd6ba71 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -54,34 +54,6 @@ func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker { return nil } -func TestBaseBalancerStripAttributes(t *testing.T) { - b := (&baseBuilder{}).Build(&testClientConn{ - newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) { - for _, addr := range addrs { - if addr.Attributes == nil { - t.Errorf("in NewSubConn, got address %+v with nil attributes, want not nil", addr) - } - } - return &testSubConn{}, nil - }, - }, balancer.BuildOptions{}).(*baseBalancer) - - b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: []resolver.Address{ - {Addr: "1.1.1.1", Attributes: &attributes.Attributes{}}, - {Addr: "2.2.2.2", Attributes: &attributes.Attributes{}}, - }, - }, - }) - - for addr := range b.subConns { - if addr.Attributes != nil { - t.Errorf("in b.subConns, got address %+v with not nil attributes, want nil", addr) - } - } -} - func TestBaseBalancerReserveAttributes(t *testing.T) { var v = func(info PickerBuildInfo) { for _, sc := range info.ReadySCs { diff --git a/balancer/grpclb/state/state.go b/balancer/grpclb/state/state.go index a24264a34f5..4ecfa1c2151 100644 --- a/balancer/grpclb/state/state.go +++ b/balancer/grpclb/state/state.go @@ -39,7 +39,7 @@ type State struct { // Set returns a copy of the provided state with attributes containing s. s's // data should not be mutated after calling Set. func Set(state resolver.State, s *State) resolver.State { - state.Attributes = state.Attributes.WithValues(key, s) + state.Attributes = state.Attributes.WithValue(key, s) return state } diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index b89cdb4a30f..eb25055ff78 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -59,18 +59,26 @@ type testServer struct { testMDChan chan []string } -func newTestServer() *testServer { - return &testServer{testMDChan: make(chan []string, 1)} +func newTestServer(mdchan bool) *testServer { + t := &testServer{} + if mdchan { + t.testMDChan = make(chan []string, 1) + } + return t } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + if s.testMDChan == nil { + return &testpb.Empty{}, nil + } md, ok := metadata.FromIncomingContext(ctx) - if ok && len(md[testMDKey]) != 0 { - select { - case s.testMDChan <- md[testMDKey]: - case <-ctx.Done(): - return nil, ctx.Err() - } + if !ok { + return nil, status.Errorf(codes.Internal, "no metadata in context") + } + select { + case s.testMDChan <- md[testMDKey]: + case <-ctx.Done(): + return nil, ctx.Err() } return &testpb.Empty{}, nil } @@ -91,7 +99,7 @@ func (t *test) cleanup() { } } -func startTestServers(count int) (_ *test, err error) { +func startTestServers(count int, mdchan bool) (_ *test, err error) { t := &test{} defer func() { @@ -106,7 +114,7 @@ func startTestServers(count int) (_ *test, err error) { } s := grpc.NewServer() - sImpl := newTestServer() + sImpl := newTestServer(mdchan) testpb.RegisterTestServiceServer(s, sImpl) t.servers = append(t.servers, s) t.serverImpls = append(t.serverImpls, sImpl) @@ -123,7 +131,7 @@ func startTestServers(count int) (_ *test, err error) { func (s) TestOneBackend(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - test, err := startTestServers(1) + test, err := startTestServers(1, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -153,7 +161,7 @@ func (s) TestBackendsRoundRobin(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") backendCount := 5 - test, err := startTestServers(backendCount) + test, err := startTestServers(backendCount, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -210,7 +218,7 @@ func (s) TestBackendsRoundRobin(t *testing.T) { func (s) TestAddressesRemoved(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - test, err := startTestServers(1) + test, err := startTestServers(1, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -255,7 +263,7 @@ func (s) TestAddressesRemoved(t *testing.T) { func (s) TestCloseWithPendingRPC(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - test, err := startTestServers(1) + test, err := startTestServers(1, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -287,7 +295,7 @@ func (s) TestCloseWithPendingRPC(t *testing.T) { func (s) TestNewAddressWhileBlocking(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - test, err := startTestServers(1) + test, err := startTestServers(1, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -334,7 +342,7 @@ func (s) TestOneServerDown(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") backendCount := 3 - test, err := startTestServers(backendCount) + test, err := startTestServers(backendCount, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -430,7 +438,7 @@ func (s) TestAllServersDown(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") backendCount := 3 - test, err := startTestServers(backendCount) + test, err := startTestServers(backendCount, false) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -500,7 +508,7 @@ func (s) TestAllServersDown(t *testing.T) { func (s) TestUpdateAddressAttributes(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - test, err := startTestServers(1) + test, err := startTestServers(1, true) if err != nil { t.Fatalf("failed to start servers: %v", err) } @@ -512,23 +520,26 @@ func (s) TestUpdateAddressAttributes(t *testing.T) { } defer cc.Close() testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { + + // The first RPC should fail because there's no address. + ctxShort, cancel2 := context.WithTimeout(ctx, time.Millisecond) + defer cancel2() + if _, err := testc.EmptyCall(ctxShort, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) } r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) // The second RPC should succeed. - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, ", err) } // The second RPC should not set metadata, so there's no md in the channel. - select { - case md1 := <-test.serverImpls[0].testMDChan: + md1 := <-test.serverImpls[0].testMDChan + if md1 != nil { t.Fatalf("got md: %v, want empty metadata", md1) - case <-time.After(time.Microsecond * 100): } const testMDValue = "test-md-value" @@ -536,14 +547,21 @@ func (s) TestUpdateAddressAttributes(t *testing.T) { r.UpdateState(resolver.State{Addresses: []resolver.Address{ imetadata.Set(resolver.Address{Addr: test.addresses[0]}, metadata.Pairs(testMDKey, testMDValue)), }}) - // The third RPC should succeed. - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - // The third RPC should send metadata with it. - md2 := <-test.serverImpls[0].testMDChan - if len(md2) == 0 || md2[0] != testMDValue { - t.Fatalf("got md: %v, want %v", md2, []string{testMDValue}) + // A future RPC should send metadata with it. The update doesn't + // necessarily happen synchronously, so we wait some time before failing if + // some RPCs do not contain it. + for { + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + if status.Code(err) == codes.DeadlineExceeded { + t.Fatalf("timed out waiting for metadata in response") + } + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + md2 := <-test.serverImpls[0].testMDChan + if len(md2) == 1 && md2[0] == testMDValue { + return + } + time.Sleep(10 * time.Millisecond) } } diff --git a/balancer/weightedroundrobin/weightedroundrobin.go b/balancer/weightedroundrobin/weightedroundrobin.go index 4b7d3bfedff..f15dddb5621 100644 --- a/balancer/weightedroundrobin/weightedroundrobin.go +++ b/balancer/weightedroundrobin/weightedroundrobin.go @@ -36,6 +36,12 @@ type AddrInfo struct { Weight uint32 } +// Equal allows the values to be compared by Attributes.Equal. +func (a AddrInfo) Equal(o interface{}) bool { + oa, ok := o.(AddrInfo) + return ok && oa.Weight == a.Weight +} + // SetAddrInfo returns a copy of addr in which the Attributes field is updated // with addrInfo. // @@ -44,7 +50,7 @@ type AddrInfo struct { // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { - addr.Attributes = addr.Attributes.WithValues(attributeKey{}, addrInfo) + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(attributeKey{}, addrInfo) return addr } @@ -55,7 +61,7 @@ func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address { // Notice: This API is EXPERIMENTAL and may be changed or removed in a // later release. func GetAddrInfo(addr resolver.Address) AddrInfo { - v := addr.Attributes.Value(attributeKey{}) + v := addr.BalancerAttributes.Value(attributeKey{}) ai, _ := v.(AddrInfo) return ai } diff --git a/balancer/weightedroundrobin/weightedwoundrobin_test.go b/balancer/weightedroundrobin/weightedroundrobin_test.go similarity index 97% rename from balancer/weightedroundrobin/weightedwoundrobin_test.go rename to balancer/weightedroundrobin/weightedroundrobin_test.go index aa46c449a13..d83619da2e6 100644 --- a/balancer/weightedroundrobin/weightedwoundrobin_test.go +++ b/balancer/weightedroundrobin/weightedroundrobin_test.go @@ -73,7 +73,7 @@ func TestAddrInfoToAndFromAttributes(t *testing.T) { } func TestGetAddInfoEmpty(t *testing.T) { - addr := resolver.Address{Attributes: attributes.New()} + addr := resolver.Address{} gotAddrInfo := GetAddrInfo(addr) wantAddrInfo := AddrInfo{} if !cmp.Equal(gotAddrInfo, wantAddrInfo) { diff --git a/internal/credentials/xds/handshake_info.go b/internal/credentials/xds/handshake_info.go index 6ef43cc89fa..9fa0c94f41e 100644 --- a/internal/credentials/xds/handshake_info.go +++ b/internal/credentials/xds/handshake_info.go @@ -43,10 +43,18 @@ func init() { // the Attributes field of resolver.Address. type handshakeAttrKey struct{} +// Equal reports whether the handshake info structs are identical (have the +// same pointer). This is sufficient as all subconns from one CDS balancer use +// the same one. +func (hi *HandshakeInfo) Equal(o interface{}) bool { + oh, ok := o.(*HandshakeInfo) + return ok && oh == hi +} + // SetHandshakeInfo returns a copy of addr in which the Attributes field is // updated with hInfo. func SetHandshakeInfo(addr resolver.Address, hInfo *HandshakeInfo) resolver.Address { - addr.Attributes = addr.Attributes.WithValues(handshakeAttrKey{}, hInfo) + addr.Attributes = addr.Attributes.WithValue(handshakeAttrKey{}, hInfo) return addr } diff --git a/internal/hierarchy/hierarchy.go b/internal/hierarchy/hierarchy.go index a2f990f552e..341d3405dc6 100644 --- a/internal/hierarchy/hierarchy.go +++ b/internal/hierarchy/hierarchy.go @@ -30,19 +30,37 @@ type pathKeyType string const pathKey = pathKeyType("grpc.internal.address.hierarchical_path") +type pathValue []string + +func (p pathValue) Equal(o interface{}) bool { + op, ok := o.(pathValue) + if !ok { + return false + } + if len(op) != len(p) { + return false + } + for i, v := range p { + if v != op[i] { + return false + } + } + return true +} + // Get returns the hierarchical path of addr. func Get(addr resolver.Address) []string { - attrs := addr.Attributes + attrs := addr.BalancerAttributes if attrs == nil { return nil } - path, _ := attrs.Value(pathKey).([]string) - return path + path, _ := attrs.Value(pathKey).(pathValue) + return ([]string)(path) } // Set overrides the hierarchical path in addr with path. func Set(addr resolver.Address, path []string) resolver.Address { - addr.Attributes = addr.Attributes.WithValues(pathKey, path) + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(pathKey, pathValue(path)) return addr } diff --git a/internal/hierarchy/hierarchy_test.go b/internal/hierarchy/hierarchy_test.go index fc62f82b085..1043d5f81df 100644 --- a/internal/hierarchy/hierarchy_test.go +++ b/internal/hierarchy/hierarchy_test.go @@ -40,7 +40,7 @@ func TestGet(t *testing.T) { { name: "set", addr: resolver.Address{ - Attributes: attributes.New(pathKey, []string{"a", "b"}), + BalancerAttributes: attributes.New(pathKey, pathValue{"a", "b"}), }, want: []string{"a", "b"}, }, @@ -68,7 +68,7 @@ func TestSet(t *testing.T) { { name: "before is set", addr: resolver.Address{ - Attributes: attributes.New(pathKey, []string{"before", "a", "b"}), + BalancerAttributes: attributes.New(pathKey, pathValue{"before", "a", "b"}), }, path: []string{"a", "b"}, }, @@ -93,19 +93,19 @@ func TestGroup(t *testing.T) { { name: "all with hierarchy", addrs: []resolver.Address{ - {Addr: "a0", Attributes: attributes.New(pathKey, []string{"a"})}, - {Addr: "a1", Attributes: attributes.New(pathKey, []string{"a"})}, - {Addr: "b0", Attributes: attributes.New(pathKey, []string{"b"})}, - {Addr: "b1", Attributes: attributes.New(pathKey, []string{"b"})}, + {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, + {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, + {Addr: "b0", BalancerAttributes: attributes.New(pathKey, pathValue{"b"})}, + {Addr: "b1", BalancerAttributes: attributes.New(pathKey, pathValue{"b"})}, }, want: map[string][]resolver.Address{ "a": { - {Addr: "a0", Attributes: attributes.New(pathKey, []string{})}, - {Addr: "a1", Attributes: attributes.New(pathKey, []string{})}, + {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, }, "b": { - {Addr: "b0", Attributes: attributes.New(pathKey, []string{})}, - {Addr: "b1", Attributes: attributes.New(pathKey, []string{})}, + {Addr: "b0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addr: "b1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, }, }, }, @@ -113,15 +113,15 @@ func TestGroup(t *testing.T) { // Addresses without hierarchy are ignored. name: "without hierarchy", addrs: []resolver.Address{ - {Addr: "a0", Attributes: attributes.New(pathKey, []string{"a"})}, - {Addr: "a1", Attributes: attributes.New(pathKey, []string{"a"})}, - {Addr: "b0", Attributes: nil}, - {Addr: "b1", Attributes: nil}, + {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, + {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, + {Addr: "b0", BalancerAttributes: nil}, + {Addr: "b1", BalancerAttributes: nil}, }, want: map[string][]resolver.Address{ "a": { - {Addr: "a0", Attributes: attributes.New(pathKey, []string{})}, - {Addr: "a1", Attributes: attributes.New(pathKey, []string{})}, + {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, }, }, }, @@ -130,15 +130,15 @@ func TestGroup(t *testing.T) { // the address is ignored. name: "wrong type", addrs: []resolver.Address{ - {Addr: "a0", Attributes: attributes.New(pathKey, []string{"a"})}, - {Addr: "a1", Attributes: attributes.New(pathKey, []string{"a"})}, - {Addr: "b0", Attributes: attributes.New(pathKey, "b")}, - {Addr: "b1", Attributes: attributes.New(pathKey, 314)}, + {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, + {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{"a"})}, + {Addr: "b0", BalancerAttributes: attributes.New(pathKey, "b")}, + {Addr: "b1", BalancerAttributes: attributes.New(pathKey, 314)}, }, want: map[string][]resolver.Address{ "a": { - {Addr: "a0", Attributes: attributes.New(pathKey, []string{})}, - {Addr: "a1", Attributes: attributes.New(pathKey, []string{})}, + {Addr: "a0", BalancerAttributes: attributes.New(pathKey, pathValue{})}, + {Addr: "a1", BalancerAttributes: attributes.New(pathKey, pathValue{})}, }, }, }, @@ -167,14 +167,14 @@ func TestGroupE2E(t *testing.T) { var addrsWithHierarchy []resolver.Address for p, wts := range hierarchy { - path1 := []string{p} + path1 := pathValue{p} for wt, addrs := range wts { - path2 := append([]string(nil), path1...) + path2 := append(pathValue(nil), path1...) path2 = append(path2, wt) for _, addr := range addrs { a := resolver.Address{ - Addr: addr, - Attributes: attributes.New(pathKey, path2), + Addr: addr, + BalancerAttributes: attributes.New(pathKey, path2), } addrsWithHierarchy = append(addrsWithHierarchy, a) } diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index 302262613a0..b8733dbf340 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -30,14 +30,38 @@ type mdKeyType string const mdKey = mdKeyType("grpc.internal.address.metadata") +type mdValue metadata.MD + +func (m mdValue) Equal(o interface{}) bool { + om, ok := o.(mdValue) + if !ok { + return false + } + if len(m) != len(om) { + return false + } + for k, v := range m { + ov := om[k] + if len(ov) != len(v) { + return false + } + for i, ve := range v { + if ov[i] != ve { + return false + } + } + } + return true +} + // Get returns the metadata of addr. func Get(addr resolver.Address) metadata.MD { attrs := addr.Attributes if attrs == nil { return nil } - md, _ := attrs.Value(mdKey).(metadata.MD) - return md + md, _ := attrs.Value(mdKey).(mdValue) + return metadata.MD(md) } // Set sets (overrides) the metadata in addr. @@ -45,6 +69,6 @@ func Get(addr resolver.Address) metadata.MD { // When a SubConn is created with this address, the RPCs sent on it will all // have this metadata. func Set(addr resolver.Address, md metadata.MD) resolver.Address { - addr.Attributes = addr.Attributes.WithValues(mdKey, md) + addr.Attributes = addr.Attributes.WithValue(mdKey, mdValue(md)) return addr } diff --git a/internal/metadata/metadata_test.go b/internal/metadata/metadata_test.go index 68c2ca5808c..1aa0f9798e8 100644 --- a/internal/metadata/metadata_test.go +++ b/internal/metadata/metadata_test.go @@ -41,7 +41,7 @@ func TestGet(t *testing.T) { { name: "not set", addr: resolver.Address{ - Attributes: attributes.New(mdKey, metadata.Pairs("k", "v")), + Attributes: attributes.New(mdKey, mdValue(metadata.Pairs("k", "v"))), }, want: metadata.Pairs("k", "v"), }, @@ -69,7 +69,7 @@ func TestSet(t *testing.T) { { name: "set before", addr: resolver.Address{ - Attributes: attributes.New(mdKey, metadata.Pairs("bef", "ore")), + Attributes: attributes.New(mdKey, mdValue(metadata.Pairs("bef", "ore"))), }, md: metadata.Pairs("k", "v"), }, diff --git a/internal/resolver/config_selector.go b/internal/resolver/config_selector.go index be7e13d5859..c7a18a948ad 100644 --- a/internal/resolver/config_selector.go +++ b/internal/resolver/config_selector.go @@ -132,7 +132,7 @@ const csKey = csKeyType("grpc.internal.resolver.configSelector") // SetConfigSelector sets the config selector in state and returns the new // state. func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State { - state.Attributes = state.Attributes.WithValues(csKey, cs) + state.Attributes = state.Attributes.WithValue(csKey, cs) return state } diff --git a/internal/transport/networktype/networktype.go b/internal/transport/networktype/networktype.go index 7bb53cff101..c11b5278274 100644 --- a/internal/transport/networktype/networktype.go +++ b/internal/transport/networktype/networktype.go @@ -31,7 +31,7 @@ const key = keyType("grpc.internal.transport.networktype") // Set returns a copy of the provided address with attributes containing networkType. func Set(address resolver.Address, networkType string) resolver.Address { - address.Attributes = address.Attributes.WithValues(key, networkType) + address.Attributes = address.Attributes.WithValue(key, networkType) return address } diff --git a/internal/xds_handshake_cluster.go b/internal/xds_handshake_cluster.go index 3677c3f04f8..e8b492774d1 100644 --- a/internal/xds_handshake_cluster.go +++ b/internal/xds_handshake_cluster.go @@ -28,7 +28,7 @@ type handshakeClusterNameKey struct{} // SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field // is updated with the cluster name. func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address { - addr.Attributes = addr.Attributes.WithValues(handshakeClusterNameKey{}, clusterName) + addr.Attributes = addr.Attributes.WithValue(handshakeClusterNameKey{}, clusterName) return addr } diff --git a/resolver/map.go b/resolver/map.go new file mode 100644 index 00000000000..bfde61b331c --- /dev/null +++ b/resolver/map.go @@ -0,0 +1,103 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +type addressMapEntry struct { + addr Address + value interface{} +} + +// AddressMap is a map of addresses to arbitrary values taking into account +// Attributes. BalancerAttributes are ignored, as are Metadata and Type. +// Multiple accesses may not be performed concurrently. Must be created via +// NewAddressMap; do not construct directly. +type AddressMap struct { + m map[string]addressMapEntryList +} + +type addressMapEntryList []*addressMapEntry + +// NewAddressMap creates a new AddressMap. +func NewAddressMap() *AddressMap { + return &AddressMap{m: make(map[string]addressMapEntryList)} +} + +// find returns the index of addr in the addressMapEntry slice, or -1 if not +// present. +func (l addressMapEntryList) find(addr Address) int { + if len(l) == 0 { + return -1 + } + for i, entry := range l { + if entry.addr.ServerName == addr.ServerName && + entry.addr.Attributes.Equal(addr.Attributes) { + return i + } + } + return -1 +} + +// Get returns the value for the address in the map, if present. +func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) { + entryList := a.m[addr.Addr] + if entry := entryList.find(addr); entry != -1 { + return entryList[entry].value, true + } + return nil, false +} + +// Set updates or adds the value to the address in the map. +func (a *AddressMap) Set(addr Address, value interface{}) { + entryList := a.m[addr.Addr] + if entry := entryList.find(addr); entry != -1 { + a.m[addr.Addr][entry].value = value + return + } + a.m[addr.Addr] = append(a.m[addr.Addr], &addressMapEntry{addr: addr, value: value}) +} + +// Delete removes addr from the map. +func (a *AddressMap) Delete(addr Address) { + entryList := a.m[addr.Addr] + entry := entryList.find(addr) + if entry == -1 { + return + } + if len(entryList) == 1 { + entryList = nil + } else { + copy(entryList[entry:], entryList[entry+1:]) + entryList = entryList[:len(entryList)-1] + } + a.m[addr.Addr] = entryList +} + +// Len returns the number of entries in the map. +func (a *AddressMap) Len() int { + return len(a.m) +} + +// Range invokes f for each entry in the map. +func (a *AddressMap) Range(f func(addr Address, value interface{})) { + for _, entryList := range a.m { + for _, entry := range entryList { + f(entry.addr, entry.value) + } + } +} diff --git a/resolver/map_test.go b/resolver/map_test.go new file mode 100644 index 00000000000..86191d82bbb --- /dev/null +++ b/resolver/map_test.go @@ -0,0 +1,153 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "testing" + + "google.golang.org/grpc/attributes" +) + +// Note: each address is different from addr1 by one value. addr7 matches +// addr1, since the only difference is BalancerAttributes, which are not +// compared. +var ( + addr1 = Address{Addr: "a1", Attributes: attributes.New("a1", 3), ServerName: "s1"} + addr2 = Address{Addr: "a2", Attributes: attributes.New("a1", 3), ServerName: "s1"} + addr3 = Address{Addr: "a1", Attributes: attributes.New("a2", 3), ServerName: "s1"} + addr4 = Address{Addr: "a1", Attributes: attributes.New("a1", 2), ServerName: "s1"} + addr5 = Address{Addr: "a1", Attributes: attributes.New("a1", "3"), ServerName: "s1"} + addr6 = Address{Addr: "a1", Attributes: attributes.New("a1", 3), ServerName: "s2"} + addr7 = Address{Addr: "a1", Attributes: attributes.New("a1", 3), ServerName: "s1", BalancerAttributes: attributes.New("xx", 3)} +) + +func (s) TestAddressMap_Length(t *testing.T) { + addrMap := NewAddressMap() + if got := addrMap.Len(); got != 0 { + t.Fatalf("addrMap.Len() = %v; want 0", got) + } + for i := 0; i < 10; i++ { + addrMap.Set(addr1, nil) + if got, want := addrMap.Len(), 1; got != want { + t.Fatalf("addrMap.Len() = %v; want %v", got, want) + } + addrMap.Set(addr7, nil) // aliases addr1 + } + for i := 0; i < 10; i++ { + addrMap.Set(addr2, nil) + if got, want := addrMap.Len(), 2; got != want { + t.Fatalf("addrMap.Len() = %v; want %v", got, want) + } + } +} + +func (s) TestAddressMap_Get(t *testing.T) { + addrMap := NewAddressMap() + addrMap.Set(addr1, 1) + + if got, ok := addrMap.Get(addr2); ok || got != nil { + t.Fatalf("addrMap.Get(addr1) = %v, %v; want nil, false", got, ok) + } + + addrMap.Set(addr2, 2) + addrMap.Set(addr3, 3) + addrMap.Set(addr4, 4) + addrMap.Set(addr5, 5) + addrMap.Set(addr6, 6) + addrMap.Set(addr7, 7) // aliases addr1 + if got, ok := addrMap.Get(addr1); !ok || got.(int) != 7 { + t.Fatalf("addrMap.Get(addr1) = %v, %v; want %v, true", got, ok, 7) + } + if got, ok := addrMap.Get(addr2); !ok || got.(int) != 2 { + t.Fatalf("addrMap.Get(addr2) = %v, %v; want %v, true", got, ok, 2) + } + if got, ok := addrMap.Get(addr3); !ok || got.(int) != 3 { + t.Fatalf("addrMap.Get(addr3) = %v, %v; want %v, true", got, ok, 3) + } + if got, ok := addrMap.Get(addr4); !ok || got.(int) != 4 { + t.Fatalf("addrMap.Get(addr4) = %v, %v; want %v, true", got, ok, 4) + } + if got, ok := addrMap.Get(addr5); !ok || got.(int) != 5 { + t.Fatalf("addrMap.Get(addr5) = %v, %v; want %v, true", got, ok, 5) + } + if got, ok := addrMap.Get(addr6); !ok || got.(int) != 6 { + t.Fatalf("addrMap.Get(addr6) = %v, %v; want %v, true", got, ok, 6) + } + if got, ok := addrMap.Get(addr7); !ok || got.(int) != 7 { + t.Fatalf("addrMap.Get(addr7) = %v, %v; want %v, true", got, ok, 7) + } +} + +func (s) TestAddressMap_Delete(t *testing.T) { + addrMap := NewAddressMap() + addrMap.Set(addr1, 1) + addrMap.Set(addr2, 2) + if got, want := addrMap.Len(), 2; got != want { + t.Fatalf("addrMap.Len() = %v; want %v", got, want) + } + addrMap.Delete(addr3) + addrMap.Delete(addr4) + addrMap.Delete(addr5) + addrMap.Delete(addr6) + addrMap.Delete(addr7) // aliases addr1 + if got, ok := addrMap.Get(addr1); ok || got != nil { + t.Fatalf("addrMap.Get(addr1) = %v, %v; want nil, false", got, ok) + } + if got, ok := addrMap.Get(addr7); ok || got != nil { + t.Fatalf("addrMap.Get(addr7) = %v, %v; want nil, false", got, ok) + } + if got, ok := addrMap.Get(addr2); !ok || got.(int) != 2 { + t.Fatalf("addrMap.Get(addr2) = %v, %v; want %v, true", got, ok, 2) + } +} + +func (s) TestAddressMap_Range(t *testing.T) { + addrMap := NewAddressMap() + addrMap.Set(addr1, 1) + addrMap.Set(addr2, 2) + addrMap.Set(addr3, 3) + addrMap.Set(addr4, 4) + addrMap.Set(addr5, 5) + addrMap.Set(addr6, 6) + addrMap.Set(addr7, 7) // aliases addr1 + + want := map[int]bool{2: true, 3: true, 4: true, 5: true, 6: true, 7: true} + test := func(a1, a2 Address, n int, v interface{}) { + if a1.Addr == a2.Addr && a1.Attributes == a2.Attributes && a1.ServerName == a2.ServerName { + if ok := want[n]; !ok { + t.Fatal("matched address multiple times:", a1, n, want) + } + if n != v.(int) { + t.Fatalf("%v read value %v; want %v:", a1, v, n) + } + delete(want, n) + } + } + addrMap.Range(func(a Address, v interface{}) { + test(a, addr1, 7, v) + test(a, addr2, 2, v) + test(a, addr3, 3, v) + test(a, addr4, 4, v) + test(a, addr5, 5, v) + test(a, addr6, 6, v) + }) + if len(want) != 0 { + t.Fatalf("did not find expected addresses; remaining: %v", want) + } +} diff --git a/resolver/resolver.go b/resolver/resolver.go index 9116897b463..873b932b20d 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -117,9 +117,14 @@ type Address struct { ServerName string // Attributes contains arbitrary data about this address intended for - // consumption by the load balancing policy. + // consumption by the SubConn. Attributes *attributes.Attributes + // BalancerAttributes contains arbitrary data about this address intended + // for consumption by the LB policy. These attribes do not affect SubConn + // creation, connection establishment, handshaking, etc. + BalancerAttributes *attributes.Attributes + // Type is the type of this address. // // Deprecated: use Attributes instead. @@ -132,6 +137,15 @@ type Address struct { Metadata interface{} } +// Equal returns whether a and o are identical. Metadata is compared directly, +// not with any recursive introspection. +func (a *Address) Equal(o *Address) bool { + return a.Addr == o.Addr && a.ServerName == o.ServerName && + a.Attributes.Equal(o.Attributes) && + a.BalancerAttributes.Equal(o.BalancerAttributes) && + a.Type == o.Type && a.Metadata == o.Metadata +} + // BuildOptions includes additional information for the builder to create // the resolver. type BuildOptions struct { diff --git a/resolver/resolver_test.go b/resolver/resolver_test.go new file mode 100644 index 00000000000..8d061f9b66d --- /dev/null +++ b/resolver/resolver_test.go @@ -0,0 +1,33 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "testing" + + "google.golang.org/grpc/internal/grpctest" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index d3475ea3f5d..191a5d56b69 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -85,7 +85,7 @@ type ignoreAttrsRRBalancer struct { func (trrb *ignoreAttrsRRBalancer) UpdateClientConnState(s balancer.ClientConnState) error { var newAddrs []resolver.Address for _, a := range s.ResolverState.Addresses { - a.Attributes = nil + a.BalancerAttributes = nil newAddrs = append(newAddrs, a) } s.ResolverState.Addresses = newAddrs @@ -137,8 +137,8 @@ func TestClusterPicks(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0], Attributes: nil}, - {Addr: testBackendAddrStrs[1], Attributes: nil}, + {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, + {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := rtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ @@ -156,11 +156,11 @@ func TestClusterPicks(t *testing.T) { for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. - addrs[0].Attributes = nil + addrs[0].BalancerAttributes = nil m1[addrs[0]] = sc rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -215,8 +215,8 @@ func TestConfigUpdateAddCluster(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0], Attributes: nil}, - {Addr: testBackendAddrStrs[1], Attributes: nil}, + {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, + {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := rtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ @@ -234,11 +234,11 @@ func TestConfigUpdateAddCluster(t *testing.T) { for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. - addrs[0].Attributes = nil + addrs[0].BalancerAttributes = nil m1[addrs[0]] = sc rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -285,7 +285,7 @@ func TestConfigUpdateAddCluster(t *testing.T) { if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } - wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil}) + wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], BalancerAttributes: nil}) if err := rtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), @@ -300,11 +300,11 @@ func TestConfigUpdateAddCluster(t *testing.T) { // Expect exactly one new subconn. addrs := <-cc.NewSubConnAddrsCh if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. - addrs[0].Attributes = nil + addrs[0].BalancerAttributes = nil m1[addrs[0]] = sc rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -372,8 +372,8 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0], Attributes: nil}, - {Addr: testBackendAddrStrs[1], Attributes: nil}, + {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, + {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, } if err := rtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ @@ -391,11 +391,11 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. - addrs[0].Attributes = nil + addrs[0].BalancerAttributes = nil m1[addrs[0]] = sc rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -475,11 +475,11 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { for range wantAddrs { addrs := <-cc.NewSubConnAddrsCh if len(hierarchy.Get(addrs[0])) != 0 { - t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes) + t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes) } sc := <-cc.NewSubConnCh // Clear the attributes before adding to map. - addrs[0].Attributes = nil + addrs[0].BalancerAttributes = nil m2[addrs[0]] = sc rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) @@ -608,7 +608,7 @@ func TestInitialIdle(t *testing.T) { // Send the config, and an address with hierarchy path ["cluster_1"]. wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0], Attributes: nil}, + {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, } if err := rtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 0cccd382410..8df20a1f9c0 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -46,6 +46,15 @@ func (l LocalityID) ToString() (string, error) { return string(b), nil } +// Equal allows the values to be compared by Attributes.Equal. +func (l LocalityID) Equal(o interface{}) bool { + ol, ok := o.(LocalityID) + if !ok { + return false + } + return l.Region == ol.Region && l.Zone == ol.Zone && l.SubZone == ol.SubZone +} + // LocalityIDFromString converts a json representation of locality, into a // LocalityID struct. func LocalityIDFromString(s string) (ret LocalityID, _ error) { @@ -62,12 +71,12 @@ const localityKey = localityKeyType("grpc.xds.internal.address.locality") // GetLocalityID returns the locality ID of addr. func GetLocalityID(addr resolver.Address) LocalityID { - path, _ := addr.Attributes.Value(localityKey).(LocalityID) + path, _ := addr.BalancerAttributes.Value(localityKey).(LocalityID) return path } // SetLocalityID sets locality ID in addr to l. func SetLocalityID(addr resolver.Address, l LocalityID) resolver.Address { - addr.Attributes = addr.Attributes.WithValues(localityKey, l) + addr.BalancerAttributes = addr.BalancerAttributes.WithValue(localityKey, l) return addr } diff --git a/xds/internal/xdsclient/attributes.go b/xds/internal/xdsclient/attributes.go index d2357df0727..467c205a255 100644 --- a/xds/internal/xdsclient/attributes.go +++ b/xds/internal/xdsclient/attributes.go @@ -54,6 +54,6 @@ func FromResolverState(state resolver.State) XDSClient { // SetClient sets c in state and returns the new state. func SetClient(state resolver.State, c XDSClient) resolver.State { - state.Attributes = state.Attributes.WithValues(clientKey, c) + state.Attributes = state.Attributes.WithValue(clientKey, c) return state }