diff --git a/balancer/balancer.go b/balancer/balancer.go index f4f9408f3852..392b21fb2d8e 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -110,6 +110,11 @@ type SubConn interface { UpdateAddresses([]resolver.Address) // Connect starts the connecting for this SubConn. Connect() + // GetOrBuildProducer returns a reference to the existing Producer for this + // ProducerBuilder in this SubConn, or, if one does not currently exist, + // creates a new one and returns it. Returns a close function which must + // be called when the Producer is no longer needed. + GetOrBuildProducer(ProducerBuilder) (p Producer, close func()) } // NewSubConnOptions contains options to create new SubConn. @@ -371,3 +376,21 @@ type ClientConnState struct { // ErrBadResolverState may be returned by UpdateClientConnState to indicate a // problem with the provided name resolver data. var ErrBadResolverState = errors.New("bad resolver state") + +// A ProducerBuilder is a simple constructor for a Producer. It is used by the +// SubConn to create producers when needed. +type ProducerBuilder interface { + // Build creates a Producer. The first parameter is always a + // grpc.ClientConnInterface (a type to allow creating RPCs/streams on the + // associated SubConn), but is declared as interface{} to avoid a + // dependency cycle. Should also return a close function that will be + // called when all references to the Producer have been given up. + Build(grpcClientConnInterface interface{}) (p Producer, close func()) +} + +// A Producer is a type shared among potentially many consumers. It is +// associated with a SubConn, and an implementation will typically contain +// other methods to provide additional functionality, e.g. configuration or +// subscription registration. +type Producer interface { +} diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index 3a3ccd6ba71a..b50abf8526e6 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -44,6 +44,10 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} func (sc *testSubConn) Connect() {} +func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { + return nil, nil +} + // testPickBuilder creates balancer.Picker for test. type testPickBuilder struct { validate func(info PickerBuildInfo) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index b1c23eaae0db..0359956d36fa 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -19,17 +19,20 @@ package grpc import ( + "context" "fmt" "strings" "sync" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" ) // ccBalancerWrapper sits between the ClientConn and the Balancer. @@ -305,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err } - acbw := &acBalancerWrapper{ac: ac} + acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} acbw.ac.mu.Lock() ac.acbw = acbw acbw.ac.mu.Unlock() @@ -359,8 +362,9 @@ func (ccb *ccBalancerWrapper) Target() string { // acBalancerWrapper is a wrapper on top of ac for balancers. // It implements balancer.SubConn interface. type acBalancerWrapper struct { - mu sync.Mutex - ac *addrConn + mu sync.Mutex + ac *addrConn + producers map[balancer.ProducerBuilder]*refCountedProducer } func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { @@ -414,3 +418,64 @@ func (acbw *acBalancerWrapper) getAddrConn() *addrConn { defer acbw.mu.Unlock() return acbw.ac } + +var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected") + +// NewStream begins a streaming RPC on the addrConn. If the addrConn is not +// ready, returns errSubConnNotReady. +func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { + transport := acbw.ac.getReadyTransport() + if transport == nil { + return nil, errSubConnNotReady + } + return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...) +} + +// Invoke performs a unary RPC. If the addrConn is not ready, returns +// errSubConnNotReady. +func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error { + cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) + if err != nil { + return err + } + if err := cs.SendMsg(args); err != nil { + return err + } + return cs.RecvMsg(reply) +} + +type refCountedProducer struct { + producer balancer.Producer + refs int // number of current refs to the producer + close func() // underlying producer's close function +} + +func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { + acbw.mu.Lock() + defer acbw.mu.Unlock() + + // Look up existing producer from this builder. + pData := acbw.producers[pb] + if pData == nil { + // Not found; create a new one and add it to the producers map. + p, close := pb.Build(acbw) + pData = &refCountedProducer{producer: p, close: close} + acbw.producers[pb] = pData + } + // Account for this new reference. + pData.refs++ + + // Return a cleanup function wrapped in a OnceFunc to remove this reference + // and delete the refCountedProducer from the map if the total reference + // count goes to zero. + unref := func() { + acbw.mu.Lock() + pData.refs-- + if pData.refs == 0 { + defer pData.close() // Run outside the acbw mutex + delete(acbw.producers, pb) + } + acbw.mu.Unlock() + } + return pData.producer, grpcsync.OnceFunc(unref) +} diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index 95ec79616eff..8927823d09da 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -68,6 +68,11 @@ func (tsc *TestSubConn) Connect() { } } +// GetOrBuildProducer is a no-op. +func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { + return nil, nil +} + // String implements stringer to print human friendly error message. func (tsc *TestSubConn) String() string { return tsc.id diff --git a/orca/internal/internal.go b/orca/internal/internal.go index 882fd8287a9b..865d94d86945 100644 --- a/orca/internal/internal.go +++ b/orca/internal/internal.go @@ -20,8 +20,15 @@ // avoid polluting the godoc of the top-level orca package. package internal +import ibackoff "google.golang.org/grpc/internal/backoff" + // AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval // configured via ServiceOptions, to a minimum of 30s. // // For testing purposes only. var AllowAnyMinReportingInterval interface{} // func(*ServiceOptions) + +// DefaultBackoffFunc is used by the producer to control its backoff behavior. +// +// For testing purposes only. +var DefaultBackoffFunc = ibackoff.DefaultExponential.Backoff diff --git a/orca/producer.go b/orca/producer.go new file mode 100644 index 000000000000..559033116667 --- /dev/null +++ b/orca/producer.go @@ -0,0 +1,221 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package orca + +import ( + "context" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/orca/internal" + "google.golang.org/grpc/status" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" + v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" + v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" + "google.golang.org/protobuf/types/known/durationpb" +) + +type producerBuilder struct{} + +// Build constructs and returns a producer and its cleanup function +func (*producerBuilder) Build(cci interface{}) (balancer.Producer, func()) { + ctx, cancel := context.WithCancel(context.Background()) + p := &producer{ + client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)), + closed: grpcsync.NewEvent(), + intervals: make(map[time.Duration]int), + listeners: make(map[OOBListener]struct{}), + backoff: internal.DefaultBackoffFunc, + } + go p.run(ctx) + return p, func() { + cancel() + <-p.closed.Done() // Block until stream stopped. + } +} + +var producerBuilderSingleton = &producerBuilder{} + +// OOBListener is used to receive out-of-band load reports as they arrive. +type OOBListener interface { + // OnLoadReport is called when a load report is received. + OnLoadReport(*v3orcapb.OrcaLoadReport) +} + +// OOBListenerOptions contains options to control how an OOBListener is called. +type OOBListenerOptions struct { + // ReportInterval specifies how often to request the server to provide a + // load report. May be provided less frequently if the server requires a + // longer interval, or may be provided more frequently if another + // subscriber requests a shorter interval. + ReportInterval time.Duration +} + +// RegisterOOBListener registers an out-of-band load report listener on sc. +// Any OOBListener may only be registered once per subchannel at a time. The +// returned stop function must be called when no longer needed. Do not +// register a single OOBListener more than once per SubConn. +func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) { + pr, close := sc.GetOrBuildProducer(producerBuilderSingleton) + p := pr.(*producer) + p.registerListener(l, opts.ReportInterval) + + // TODO: When we can register for SubConn state updates, don't call run() + // until READY and automatically call stop() on SHUTDOWN. + + // If stop is called multiple times, prevent it from having any effect on + // subsequent calls. + return grpcsync.OnceFunc(func() { + p.unregisterListener(l, opts.ReportInterval) + close() + }) +} + +type producer struct { + client v3orcaservicegrpc.OpenRcaServiceClient + + closed *grpcsync.Event // fired when closure completes + // backoff is called between stream attempts to determine how long to delay + // to avoid overloading a server experiencing problems. The attempt count + // is incremented when stream errors occur and is reset when the stream + // reports a result. + backoff func(int) time.Duration + + mu sync.Mutex + intervals map[time.Duration]int // map from interval time to count of listeners requesting that time + listeners map[OOBListener]struct{} // set of registered listeners +} + +// registerListener adds the listener and its requested report interval to the +// producer. +func (p *producer) registerListener(l OOBListener, interval time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + p.listeners[l] = struct{}{} + p.intervals[interval]++ +} + +// registerListener removes the listener and its requested report interval to +// the producer. +func (p *producer) unregisterListener(l OOBListener, interval time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.listeners, l) + p.intervals[interval]-- + if p.intervals[interval] == 0 { + delete(p.intervals, interval) + } +} + +// minInterval returns the smallest key in p.intervals. +func (p *producer) minInterval() time.Duration { + p.mu.Lock() + defer p.mu.Unlock() + var min time.Duration + first := true + for t := range p.intervals { + if t < min || first { + min = t + first = false + } + } + return min +} + +// run manages the ORCA OOB stream on the subchannel. +func (p *producer) run(ctx context.Context) { + defer p.closed.Fire() + backoffAttempt := 0 + backoffTimer := time.NewTimer(0) + for ctx.Err() == nil { + select { + case <-backoffTimer.C: + case <-ctx.Done(): + return + } + + resetBackoff, err := p.runStream(ctx) + + if resetBackoff { + backoffTimer.Reset(0) + backoffAttempt = 0 + } else { + backoffTimer.Reset(p.backoff(backoffAttempt)) + backoffAttempt++ + } + + switch { + case err == nil: + // No error was encountered; restart the stream. + case ctx.Err() != nil: + // Producer was stopped; exit immediately and without logging an + // error. + return + case status.Code(err) == codes.Unimplemented: + // Unimplemented; do not retry. + logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.") + return + case status.Code(err) == codes.Unavailable: + // The SubConn is not currently ready; backoff silently. + // + // TODO: don't attempt the stream until the state is READY to + // minimize the chances of this case and to avoid using the + // exponential backoff mechanism, as we should know it's safe to + // retry when the state is READY again. + default: + // Log all other errors. + logger.Error("Received unexpected stream error:", err) + } + } +} + +// runStream runs a single stream on the subchannel and returns the resulting +// error, if any, and whether or not the run loop should reset the backoff +// timer to zero or advance it. +func (p *producer) runStream(ctx context.Context) (resetBackoff bool, err error) { + interval := p.minInterval() + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{ + ReportInterval: durationpb.New(interval), + }) + if err != nil { + return false, err + } + + for { + report, err := stream.Recv() + if err != nil { + return resetBackoff, err + } + resetBackoff = true + p.mu.Lock() + for l := range p.listeners { + l.OnLoadReport(report) + } + p.mu.Unlock() + if interval != p.minInterval() { + // restart stream to use new interval + return true, nil + } + } +} diff --git a/orca/producer_test.go b/orca/producer_test.go new file mode 100644 index 000000000000..f15317995dec --- /dev/null +++ b/orca/producer_test.go @@ -0,0 +1,549 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package orca_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/orca" + "google.golang.org/grpc/orca/internal" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" + v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" + v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" +) + +// customLBB wraps a round robin LB policy but provides a ClientConn wrapper to +// add an ORCA OOB report producer for all created SubConns. +type customLBB struct{} + +func (customLBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return balancer.Get(roundrobin.Name).Build(&ccWrapper{ClientConn: cc}, opts) +} + +func (customLBB) Name() string { return "customLB" } + +func init() { + balancer.Register(customLBB{}) +} + +type ccWrapper struct { + balancer.ClientConn +} + +func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + if len(addrs) != 1 { + panic(fmt.Sprintf("got addrs=%v; want len(addrs) == 1", addrs)) + } + sc, err := w.ClientConn.NewSubConn(addrs, opts) + if err != nil { + return sc, err + } + l := getListenerInfo(addrs[0]) + l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts) + l.sc = sc + return sc, nil +} + +// listenerInfo is stored in an address's attributes to allow ORCA +// listeners to be registered on subconns created for that address. +type listenerInfo struct { + listener *testOOBListener + opts orca.OOBListenerOptions + sc balancer.SubConn // Set by the LB policy +} + +type listenerInfoKey struct{} + +func setListenerInfo(addr resolver.Address, l *listenerInfo) resolver.Address { + addr.Attributes = addr.Attributes.WithValue(listenerInfoKey{}, l) + return addr +} + +func getListenerInfo(addr resolver.Address) *listenerInfo { + return addr.Attributes.Value(listenerInfoKey{}).(*listenerInfo) +} + +// testOOBListener is a simple listener that pushes load reports to a channel. +type testOOBListener struct { + cleanup func() + loadReportCh chan *v3orcapb.OrcaLoadReport +} + +func newTestOOBListener() *testOOBListener { + return &testOOBListener{cleanup: func() {}, loadReportCh: make(chan *v3orcapb.OrcaLoadReport)} +} + +func (t *testOOBListener) Stop() { t.cleanup() } + +func (t *testOOBListener) OnLoadReport(r *v3orcapb.OrcaLoadReport) { + t.loadReportCh <- r +} + +// TestProducer is a basic, end-to-end style test of an LB policy with an +// OOBListener communicating with a server with an ORCA service. +func (s) TestProducer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Use a fixed backoff for stream recreation. + oldBackoff := internal.DefaultBackoffFunc + internal.DefaultBackoffFunc = func(int) time.Duration { return 10 * time.Millisecond } + defer func() { internal.DefaultBackoffFunc = oldBackoff }() + + // Initialize listener for our ORCA server. + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + + // Register the OpenRCAService with a very short metrics reporting interval. + const shortReportingInterval = 50 * time.Millisecond + opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval} + internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts) + s := grpc.NewServer() + orcaSrv, err := orca.Register(s, opts) + if err != nil { + t.Fatalf("orca.Register failed: %v", err) + } + go s.Serve(lis) + defer s.Stop() + + // Create our client with an OOB listener in the LB policy it selects. + r := manual.NewBuilderWithScheme("whatever") + oobLis := newTestOOBListener() + + lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond} + li := &listenerInfo{listener: oobLis, opts: lisOpts} + addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li) + r.InitialState(resolver.State{Addresses: []resolver.Address{addr}}) + cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + defer cc.Close() + + // Ensure the OOB listener is stopped before the client is closed to avoid + // a potential irrelevant error in the logs. + defer oobLis.Stop() + + // Set a few metrics and wait for them on the client side. + orcaSrv.SetCPUUtilization(10) + orcaSrv.SetMemoryUtilization(100) + orcaSrv.SetUtilization("bob", 555) + loadReportWant := &v3orcapb.OrcaLoadReport{ + CpuUtilization: 10, + MemUtilization: 100, + Utilization: map[string]float64{"bob": 555}, + } + +testReport: + for { + select { + case r := <-oobLis.loadReportCh: + t.Log("Load report received: ", r) + if proto.Equal(r, loadReportWant) { + // Success! + break testReport + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for load report: %v", loadReportWant) + } + } + + // Change and add metrics and wait for them on the client side. + orcaSrv.SetCPUUtilization(50) + orcaSrv.SetMemoryUtilization(200) + orcaSrv.SetUtilization("mary", 321) + loadReportWant = &v3orcapb.OrcaLoadReport{ + CpuUtilization: 50, + MemUtilization: 200, + Utilization: map[string]float64{"bob": 555, "mary": 321}, + } + + for { + select { + case r := <-oobLis.loadReportCh: + t.Log("Load report received: ", r) + if proto.Equal(r, loadReportWant) { + // Success! + return + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for load report: %v", loadReportWant) + } + } +} + +// fakeORCAService is a simple implementation of an ORCA service that pushes +// requests it receives from clients to a channel and sends responses from a +// channel back. This allows tests to verify the client is sending requests +// and processing responses properly. +type fakeORCAService struct { + v3orcaservicegrpc.UnimplementedOpenRcaServiceServer + + reqCh chan *v3orcaservicepb.OrcaLoadReportRequest + respCh chan interface{} // either *v3orcapb.OrcaLoadReport or error +} + +func newFakeORCAService() *fakeORCAService { + return &fakeORCAService{ + reqCh: make(chan *v3orcaservicepb.OrcaLoadReportRequest), + respCh: make(chan interface{}), + } +} + +func (f *fakeORCAService) close() { + close(f.respCh) +} + +func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { + f.reqCh <- req + for resp := range f.respCh { + if err, ok := resp.(error); ok { + return err + } + if err := stream.Send(resp.(*v3orcapb.OrcaLoadReport)); err != nil { + // In the event that a stream error occurs, a new stream will have + // been created that was waiting for this response message. Push + // it back onto the channel and return. + // + // This happens because we range over respCh. If we changed to + // instead select on respCh + stream.Context(), the same situation + // could still occur due to a race between noticing the two events, + // so such a workaround would still be needed to prevent flakiness. + f.respCh <- resp + return err + } + } + return nil +} + +// TestProducerBackoff verifies that the ORCA producer applies the proper +// backoff after stream failures. +func (s) TestProducerBackoff(t *testing.T) { + grpctest.TLogger.ExpectErrorN("injected error", 4) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Provide a convenient way to expect backoff calls and return a minimal + // value. + const backoffShouldNotBeCalled = 9999 // Use to assert backoff function is not called. + const backoffAllowAny = -1 // Use to ignore any backoff calls. + expectedBackoff := backoffAllowAny + oldBackoff := internal.DefaultBackoffFunc + internal.DefaultBackoffFunc = func(got int) time.Duration { + if expectedBackoff == backoffShouldNotBeCalled { + t.Errorf("Unexpected backoff call; parameter = %v", got) + } else if expectedBackoff != backoffAllowAny { + if got != expectedBackoff { + t.Errorf("Unexpected backoff received; got %v want %v", got, expectedBackoff) + } + } + return time.Millisecond + } + defer func() { internal.DefaultBackoffFunc = oldBackoff }() + + // Initialize listener for our ORCA server. + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + + // Register our fake ORCA service. + s := grpc.NewServer() + fake := newFakeORCAService() + defer fake.close() + v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake) + go s.Serve(lis) + defer s.Stop() + + // Define the report interval and a function to wait for it to be sent to + // the server. + const reportInterval = 123 * time.Second + awaitRequest := func(interval time.Duration) { + select { + case req := <-fake.reqCh: + if got := req.GetReportInterval().AsDuration(); got != interval { + t.Errorf("Unexpected report interval; got %v want %v", got, interval) + } + case <-ctx.Done(): + t.Fatalf("Did not receive client request") + } + } + + // Create our client with an OOB listener in the LB policy it selects. + r := manual.NewBuilderWithScheme("whatever") + oobLis := newTestOOBListener() + + lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval} + li := &listenerInfo{listener: oobLis, opts: lisOpts} + r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}}) + cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + defer cc.Close() + + // Ensure the OOB listener is stopped before the client is closed to avoid + // a potential irrelevant error in the logs. + defer oobLis.Stop() + + // Define a load report to send and expect the client to see. + loadReportWant := &v3orcapb.OrcaLoadReport{ + CpuUtilization: 10, + MemUtilization: 100, + Utilization: map[string]float64{"bob": 555}, + } + + // Unblock the fake. + awaitRequest(reportInterval) + fake.respCh <- loadReportWant + select { + case r := <-oobLis.loadReportCh: + t.Log("Load report received: ", r) + if proto.Equal(r, loadReportWant) { + // Success! + break + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for load report: %v", loadReportWant) + } + + // The next request should be immediate, since there was a message + // received. + expectedBackoff = backoffShouldNotBeCalled + fake.respCh <- status.Errorf(codes.Internal, "injected error") + awaitRequest(reportInterval) + + // The next requests will need to backoff. + expectedBackoff = 0 + fake.respCh <- status.Errorf(codes.Internal, "injected error") + awaitRequest(reportInterval) + expectedBackoff = 1 + fake.respCh <- status.Errorf(codes.Internal, "injected error") + awaitRequest(reportInterval) + expectedBackoff = 2 + fake.respCh <- status.Errorf(codes.Internal, "injected error") + awaitRequest(reportInterval) + // The next request should be immediate, since there was a message + // received. + expectedBackoff = backoffShouldNotBeCalled + + // Send another valid response and wait for it on the client. + fake.respCh <- loadReportWant + select { + case r := <-oobLis.loadReportCh: + t.Log("Load report received: ", r) + if proto.Equal(r, loadReportWant) { + // Success! + break + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for load report: %v", loadReportWant) + } +} + +// TestProducerMultipleListeners tests that multiple listeners works as +// expected in a producer: requesting the proper interval and delivering the +// update to all listeners. +func (s) TestProducerMultipleListeners(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Provide a convenient way to expect backoff calls and return a minimal + // value. + oldBackoff := internal.DefaultBackoffFunc + internal.DefaultBackoffFunc = func(got int) time.Duration { + return time.Millisecond + } + defer func() { internal.DefaultBackoffFunc = oldBackoff }() + + // Initialize listener for our ORCA server. + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + + // Register our fake ORCA service. + s := grpc.NewServer() + fake := newFakeORCAService() + defer fake.close() + v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake) + go s.Serve(lis) + defer s.Stop() + + // Define the report interval and a function to wait for it to be sent to + // the server. + const reportInterval1 = 123 * time.Second + const reportInterval2 = 234 * time.Second + const reportInterval3 = 56 * time.Second + awaitRequest := func(interval time.Duration) { + select { + case req := <-fake.reqCh: + if got := req.GetReportInterval().AsDuration(); got != interval { + t.Errorf("Unexpected report interval; got %v want %v", got, interval) + } + case <-ctx.Done(): + t.Fatalf("Did not receive client request") + } + } + + // Create our client with an OOB listener in the LB policy it selects. + r := manual.NewBuilderWithScheme("whatever") + oobLis1 := newTestOOBListener() + lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1} + li := &listenerInfo{listener: oobLis1, opts: lisOpts1} + r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}}) + cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial failed: %v", err) + } + defer cc.Close() + + // Ensure the OOB listener is stopped before the client is closed to avoid + // a potential irrelevant error in the logs. + defer oobLis1.Stop() + + oobLis2 := newTestOOBListener() + lisOpts2 := orca.OOBListenerOptions{ReportInterval: reportInterval2} + + oobLis3 := newTestOOBListener() + lisOpts3 := orca.OOBListenerOptions{ReportInterval: reportInterval3} + + // Define a load report to send and expect the client to see. + loadReportWant := &v3orcapb.OrcaLoadReport{ + CpuUtilization: 10, + MemUtilization: 100, + Utilization: map[string]float64{"bob": 555}, + } + + // Receive reports and update counts for the three listeners. + var reportsMu sync.Mutex + var reportsReceived1, reportsReceived2, reportsReceived3 int + go func() { + for { + select { + case r := <-oobLis1.loadReportCh: + t.Log("Load report 1 received: ", r) + if !proto.Equal(r, loadReportWant) { + t.Errorf("Unexpected report received: %+v", r) + } + reportsMu.Lock() + reportsReceived1++ + reportsMu.Unlock() + case r := <-oobLis2.loadReportCh: + t.Log("Load report 2 received: ", r) + if !proto.Equal(r, loadReportWant) { + t.Errorf("Unexpected report received: %+v", r) + } + reportsMu.Lock() + reportsReceived2++ + reportsMu.Unlock() + case r := <-oobLis3.loadReportCh: + t.Log("Load report 3 received: ", r) + if !proto.Equal(r, loadReportWant) { + t.Errorf("Unexpected report received: %+v", r) + } + reportsMu.Lock() + reportsReceived3++ + reportsMu.Unlock() + case <-ctx.Done(): + // Test has ended; exit + return + } + } + }() + + // checkReports is a helper function to check the report counts for the three listeners. + checkReports := func(r1, r2, r3 int) { + t.Helper() + for ctx.Err() == nil { + reportsMu.Lock() + if r1 == reportsReceived1 && r2 == reportsReceived2 && r3 == reportsReceived3 { + // Success! + reportsMu.Unlock() + return + } + if reportsReceived1 > r1 || reportsReceived2 > r2 || reportsReceived3 > r3 { + reportsMu.Unlock() + t.Fatalf("received excess reports. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3) + return + } + reportsMu.Unlock() + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("timed out waiting for reports received. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3) + } + + // Only 1 listener; expect reportInterval1 to be used and expect the report + // to be sent to the listener. + awaitRequest(reportInterval1) + fake.respCh <- loadReportWant + checkReports(1, 0, 0) + + // Register listener 2 with a less frequent interval; no need to recreate + // stream. Report should go to both listeners. + oobLis2.cleanup = orca.RegisterOOBListener(li.sc, oobLis2, lisOpts2) + fake.respCh <- loadReportWant + checkReports(2, 1, 0) + + // Register listener 3 with a more frequent interval; stream is recreated + // with this interval after the next report is received. The first report + // will go to all three listeners. + oobLis3.cleanup = orca.RegisterOOBListener(li.sc, oobLis3, lisOpts3) + fake.respCh <- loadReportWant + checkReports(3, 2, 1) + awaitRequest(reportInterval3) + + // Another report without a change in listeners should go to all three listeners. + fake.respCh <- loadReportWant + checkReports(4, 3, 2) + + // Stop listener 2. This does not affect the interval as listener 3 is + // still the shortest. The next update goes to listeners 1 and 3. + oobLis2.Stop() + fake.respCh <- loadReportWant + checkReports(5, 3, 3) + + // Stop listener 3. This makes the interval longer, with stream recreation + // delayed until the next report is received. Reports should only go to + // listener 1 now. + oobLis3.Stop() + fake.respCh <- loadReportWant + checkReports(6, 3, 3) + awaitRequest(reportInterval1) + // Another report without a change in listeners should go to the first listener. + fake.respCh <- loadReportWant + checkReports(7, 3, 3) +} diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index 1973e1549188..c2a5729e3bcb 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -190,6 +190,9 @@ type fakeSubConn struct{} func (*fakeSubConn) UpdateAddresses([]resolver.Address) { panic("implement me") } func (*fakeSubConn) Connect() { panic("implement me") } +func (*fakeSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { + panic("implement me") +} // waitForNewChildLB makes sure that a new child LB is created by the top-level // clusterResolverBalancer.