diff --git a/api/src/main/java/io/grpc/SynchronizationContext.java b/api/src/main/java/io/grpc/SynchronizationContext.java index 03d26b55f0a..fe4243ec227 100644 --- a/api/src/main/java/io/grpc/SynchronizationContext.java +++ b/api/src/main/java/io/grpc/SynchronizationContext.java @@ -163,6 +163,38 @@ public String toString() { return new ScheduledHandle(runnable, future); } + /** + * Schedules a task to be added and run via {@link #execute} after an inital delay and then + * repeated after the delay until cancelled. + * + * @param task the task being scheduled + * @param initialDelay the delay before the first run + * @param delay the delay after the first run. + * @param unit the time unit for the delay + * @param timerService the {@code ScheduledExecutorService} that provides delayed execution + * + * @return an object for checking the status and/or cancel the scheduled task + */ + public final ScheduledHandle scheduleWithFixedDelay( + final Runnable task, long initialDelay, long delay, TimeUnit unit, + ScheduledExecutorService timerService) { + final ManagedRunnable runnable = new ManagedRunnable(task); + ScheduledFuture future = timerService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + execute(runnable); + } + + @Override + public String toString() { + return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay + + ")"; + } + }, initialDelay, delay, unit); + return new ScheduledHandle(runnable, future); + } + + private static class ManagedRunnable implements Runnable { final Runnable task; boolean isCancelled; diff --git a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java index e8588e5e8d8..3debc871121 100644 --- a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java +++ b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java @@ -41,7 +41,7 @@ public void getClassesViaHardcoded_classesPresent() throws Exception { @Test public void stockProviders() { LoadBalancerRegistry defaultRegistry = LoadBalancerRegistry.getDefaultRegistry(); - assertThat(defaultRegistry.providers()).hasSize(3); + assertThat(defaultRegistry.providers()).hasSize(4); LoadBalancerProvider pickFirst = defaultRegistry.getProvider("pick_first"); assertThat(pickFirst).isInstanceOf(PickFirstLoadBalancerProvider.class); @@ -52,6 +52,12 @@ public void stockProviders() { "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); assertThat(roundRobin.getPriority()).isEqualTo(5); + LoadBalancerProvider outlierDetection = defaultRegistry.getProvider( + "outlier_detection_experimental"); + assertThat(outlierDetection.getClass().getName()).isEqualTo( + "io.grpc.util.OutlierDetectionLoadBalancerProvider"); + assertThat(roundRobin.getPriority()).isEqualTo(5); + LoadBalancerProvider grpclb = defaultRegistry.getProvider("grpclb"); assertThat(grpclb).isInstanceOf(GrpclbLoadBalancerProvider.class); assertThat(grpclb.getPriority()).isEqualTo(5); diff --git a/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java b/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java new file mode 100644 index 00000000000..3fb32fdb078 --- /dev/null +++ b/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java @@ -0,0 +1,1067 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ForwardingMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; +import io.grpc.ClientStreamTracer.StreamInfo; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.internal.TimeProvider; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +/** + * Wraps a child {@code LoadBalancer} while monitoring for outlier backends and removing them from + * the use of the child LB. + * + *

This implements the outlier detection gRFC: + * https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md + */ +public class OutlierDetectionLoadBalancer extends LoadBalancer { + + @VisibleForTesting + final AddressTrackerMap trackerMap; + + private final SynchronizationContext syncContext; + private final Helper childHelper; + private final GracefulSwitchLoadBalancer switchLb; + private TimeProvider timeProvider; + private final ScheduledExecutorService timeService; + private ScheduledHandle detectionTimerHandle; + private Long detectionTimerStartNanos; + + private static final Attributes.Key ADDRESS_TRACKER_ATTR_KEY + = Attributes.Key.create("addressTrackerKey"); + + /** + * Creates a new instance of {@link OutlierDetectionLoadBalancer}. + */ + public OutlierDetectionLoadBalancer(Helper helper, TimeProvider timeProvider) { + childHelper = new ChildHelper(checkNotNull(helper, "helper")); + switchLb = new GracefulSwitchLoadBalancer(childHelper); + trackerMap = new AddressTrackerMap(); + this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); + this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); + this.timeProvider = timeProvider; + } + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + OutlierDetectionLoadBalancerConfig config + = (OutlierDetectionLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + + // The map should only retain entries for addresses in this latest update. + ArrayList addresses = new ArrayList<>(); + for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) { + addresses.addAll(addressGroup.getAddresses()); + } + trackerMap.keySet().retainAll(addresses); + + trackerMap.updateTrackerConfigs(config); + + // Add any new ones. + trackerMap.putNewTrackers(config, addresses); + + switchLb.switchTo(config.childPolicy.getProvider()); + + // If outlier detection is actually configured, start a timer that will periodically try to + // detect outliers. + if (config.outlierDetectionEnabled()) { + Long initialDelayNanos; + + if (detectionTimerStartNanos == null) { + // On the first go we use the configured interval. + initialDelayNanos = config.intervalNanos; + } else { + // If a timer has started earlier we cancel it and use the difference between the start + // time and now as the interval. + initialDelayNanos = Math.max(0L, + config.intervalNanos - (timeProvider.currentTimeNanos() - detectionTimerStartNanos)); + } + + // If a timer has been previously created we need to cancel it and reset all the call counters + // for a fresh start. + if (detectionTimerHandle != null) { + detectionTimerHandle.cancel(); + trackerMap.resetCallCounters(); + } + + detectionTimerHandle = syncContext.scheduleWithFixedDelay(new DetectionTimer(config), + initialDelayNanos, config.intervalNanos, NANOSECONDS, timeService); + } else if (detectionTimerHandle != null) { + // Outlier detection is not configured, but we have a lingering timer. Let's cancel it and + // uneject any addresses we may have ejected. + detectionTimerHandle.cancel(); + detectionTimerStartNanos = null; + trackerMap.cancelTracking(); + } + + switchLb.handleResolvedAddresses(resolvedAddresses); + } + + @Override + public void handleNameResolutionError(Status error) { + switchLb.handleNameResolutionError(error); + } + + @Override + public void shutdown() { + switchLb.shutdown(); + } + + /** + * This timer will be invoked periodically, according to configuration, and it will look for any + * outlier subchannels. + */ + class DetectionTimer implements Runnable { + + OutlierDetectionLoadBalancerConfig config; + + DetectionTimer(OutlierDetectionLoadBalancerConfig config) { + this.config = config; + } + + @Override + public void run() { + detectionTimerStartNanos = timeProvider.currentTimeNanos(); + + trackerMap.swapCounters(); + + for (OutlierEjectionAlgorithm algo : OutlierEjectionAlgorithm.forConfig(config)) { + algo.ejectOutliers(trackerMap, detectionTimerStartNanos); + } + + trackerMap.maybeUnejectOutliers(detectionTimerStartNanos); + } + } + + /** + * This child helper wraps the provided helper so that it can hand out wrapped {@link + * OutlierDetectionSubchannel}s and manage the address info map. + */ + class ChildHelper extends ForwardingLoadBalancerHelper { + + private Helper delegate; + + ChildHelper(Helper delegate) { + this.delegate = delegate; + } + + @Override + protected Helper delegate() { + return delegate; + } + + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + // Subchannels are wrapped so that we can monitor call results and to trigger failures when + // we decide to eject the subchannel. + OutlierDetectionSubchannel subchannel = new OutlierDetectionSubchannel( + delegate.createSubchannel(args)); + + // If the subchannel is associated with a single address that is also already in the map + // the subchannel will be added to the map and be included in outlier detection. + List addressGroups = subchannel.getAllAddresses(); + if (hasSingleAddress(addressGroups) + && trackerMap.containsKey(addressGroups.get(0).getAddresses().get(0))) { + AddressTracker tracker = trackerMap.get(addressGroups.get(0).getAddresses().get(0)); + tracker.addSubchannel(subchannel); + + // If this address has already been ejected, we need to immediately eject this Subchannel. + if (tracker.ejectionTimeNanos != null) { + subchannel.eject(); + } + } + + return subchannel; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + delegate.updateBalancingState(newState, new OutlierDetectionPicker(newPicker)); + } + } + + class OutlierDetectionSubchannel extends ForwardingSubchannel { + + private final Subchannel delegate; + private AddressTracker addressTracker; + private boolean ejected; + private ConnectivityStateInfo lastSubchannelState; + private OutlierDetectionSubchannelStateListener subchannelStateListener; + + OutlierDetectionSubchannel(Subchannel delegate) { + this.delegate = delegate; + } + + @Override + public void start(SubchannelStateListener listener) { + subchannelStateListener = new OutlierDetectionSubchannelStateListener(listener); + super.start(subchannelStateListener); + } + + @Override + public Attributes getAttributes() { + if (addressTracker != null) { + return delegate.getAttributes().toBuilder().set(ADDRESS_TRACKER_ATTR_KEY, addressTracker) + .build(); + } else { + return delegate.getAttributes(); + } + } + + @Override + public void updateAddresses(List addressGroups) { + // Outlier detection only supports subchannels with a single address, but the list of + // addressGroups associated with a subchannel can change at any time, so we need to react to + // changes in the address list plurality. + + // No change in address plurality, we replace the single one with a new one. + if (hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) { + // Remove the current subchannel from the old address it is associated with in the map. + if (trackerMap.containsValue(addressTracker)) { + addressTracker.removeSubchannel(this); + } + + // If the map has an entry for the new address, we associate this subchannel with it. + SocketAddress address = addressGroups.get(0).getAddresses().get(0); + if (trackerMap.containsKey(address)) { + trackerMap.get(address).addSubchannel(this); + } + } else if (hasSingleAddress(getAllAddresses()) && !hasSingleAddress(addressGroups)) { + // We go from a single address to having multiple, making this subchannel uneligible for + // outlier detection. Remove it from all trackers and reset the call counters of all the + // associated trackers. + // Remove the current subchannel from the old address it is associated with in the map. + if (trackerMap.containsKey(getAddresses().getAddresses().get(0))) { + AddressTracker tracker = trackerMap.get(getAddresses().getAddresses().get(0)); + tracker.removeSubchannel(this); + tracker.resetCallCounters(); + } + } else if (!hasSingleAddress(getAllAddresses()) && hasSingleAddress(addressGroups)) { + // We go from, previously uneligble, multiple address mode to a single address. If the map + // has an entry for the new address, we associate this subchannel with it. + SocketAddress address = addressGroups.get(0).getAddresses().get(0); + if (trackerMap.containsKey(address)) { + AddressTracker tracker = trackerMap.get(address); + tracker.addSubchannel(this); + } + } + + // We could also have multiple addressGroups and get an update for multiple new ones. This is + // a no-op as we will just continue to ignore multiple address subchannels. + + delegate.updateAddresses(addressGroups); + } + + /** + * If the {@link Subchannel} is considered for outlier detection the associated {@link + * AddressTracker} should be set. + */ + void setAddressTracker(AddressTracker addressTracker) { + this.addressTracker = addressTracker; + } + + void clearAddressTracker() { + this.addressTracker = null; + } + + void eject() { + ejected = true; + subchannelStateListener.onSubchannelState( + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); + } + + void uneject() { + ejected = false; + if (lastSubchannelState != null) { + subchannelStateListener.onSubchannelState(lastSubchannelState); + } + } + + boolean isEjected() { + return ejected; + } + + @Override + protected Subchannel delegate() { + return delegate; + } + + /** + * Wraps the actual listener so that state changes from the actual one can be intercepted. + */ + class OutlierDetectionSubchannelStateListener implements SubchannelStateListener { + + private final SubchannelStateListener delegate; + + OutlierDetectionSubchannelStateListener(SubchannelStateListener delegate) { + this.delegate = delegate; + } + + @Override + public void onSubchannelState(ConnectivityStateInfo newState) { + lastSubchannelState = newState; + if (!ejected) { + delegate.onSubchannelState(newState); + } + } + } + } + + + /** + * This picker delegates the actual picking logic to a wrapped delegate, but associates a {@link + * ClientStreamTracer} with each pick to track the results of each subchannel stream. + */ + class OutlierDetectionPicker extends SubchannelPicker { + + private final SubchannelPicker delegate; + + OutlierDetectionPicker(SubchannelPicker delegate) { + this.delegate = delegate; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult pickResult = delegate.pickSubchannel(args); + + Subchannel subchannel = pickResult.getSubchannel(); + if (subchannel != null) { + return PickResult.withSubchannel(subchannel, + new ResultCountingClientStreamTracerFactory( + subchannel.getAttributes().get(ADDRESS_TRACKER_ATTR_KEY))); + } + + return pickResult; + } + + /** + * Builds instances of {@link ResultCountingClientStreamTracer}. + */ + class ResultCountingClientStreamTracerFactory extends ClientStreamTracer.Factory { + + private final AddressTracker tracker; + + ResultCountingClientStreamTracerFactory(AddressTracker tracker) { + this.tracker = tracker; + } + + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new ResultCountingClientStreamTracer(tracker); + } + } + + /** + * Counts the results (successful/unsuccessful) of a particular {@link + * OutlierDetectionSubchannel}s streams and increments the counter in the associated {@link + * AddressTracker}. + */ + class ResultCountingClientStreamTracer extends ClientStreamTracer { + + AddressTracker tracker; + + public ResultCountingClientStreamTracer(AddressTracker tracker) { + this.tracker = tracker; + } + + @Override + public void streamClosed(Status status) { + tracker.incrementCallCount(status.isOk()); + } + } + } + + /** + * Tracks additional information about a set of equivalent addresses needed for outlier + * detection. + */ + static class AddressTracker { + + private OutlierDetectionLoadBalancerConfig config; + // Marked as volatile to assure that when the inactive counter is swapped in as the new active + // one, all threads see the change and don't hold on to a reference to the now inactive counter. + private volatile CallCounter activeCallCounter = new CallCounter(); + private CallCounter inactiveCallCounter = new CallCounter(); + private Long ejectionTimeNanos; + private int ejectionTimeMultiplier; + private final Set subchannels = new HashSet<>(); + + AddressTracker(OutlierDetectionLoadBalancerConfig config) { + this.config = config; + } + + void setConfig(OutlierDetectionLoadBalancerConfig config) { + this.config = config; + } + + /** + * Adds a subchannel to the tracker, while assuring that the subchannel ejection status is + * updated to match the tracker's if needed. + */ + boolean addSubchannel(OutlierDetectionSubchannel subchannel) { + // Make sure that the subchannel is in the same ejection state as the new tracker it is + // associated with. + if (subchannelsEjected() && !subchannel.isEjected()) { + subchannel.eject(); + } else if (!subchannelsEjected() && subchannel.isEjected()) { + subchannel.uneject(); + } + subchannel.setAddressTracker(this); + return subchannels.add(subchannel); + } + + boolean removeSubchannel(OutlierDetectionSubchannel subchannel) { + subchannel.clearAddressTracker(); + return subchannels.remove(subchannel); + } + + boolean containsSubchannel(OutlierDetectionSubchannel subchannel) { + return subchannels.contains(subchannel); + } + + @VisibleForTesting + Set getSubchannels() { + return ImmutableSet.copyOf(subchannels); + } + + void incrementCallCount(boolean success) { + // If neither algorithm is configured, no point in incrementing counters. + if (config.successRateEjection == null && config.failurePercentageEjection == null) { + return; + } + + if (success) { + activeCallCounter.successCount.getAndIncrement(); + } else { + activeCallCounter.failureCount.getAndIncrement(); + } + } + + @VisibleForTesting + long activeVolume() { + return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get(); + } + + long inactiveVolume() { + return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get(); + } + + double successRate() { + return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume(); + } + + double failureRate() { + return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume(); + } + + void resetCallCounters() { + activeCallCounter.reset(); + inactiveCallCounter.reset(); + } + + void decrementEjectionTimeMultiplier() { + // The multiplier should not go negative. + ejectionTimeMultiplier = ejectionTimeMultiplier == 0 ? 0 : ejectionTimeMultiplier - 1; + } + + void resetEjectionTimeMultiplier() { + ejectionTimeMultiplier = 0; + } + + /** + * Swaps the active and inactive counters. + * + *

Note that this method is not thread safe as the swap is not done atomically. This is + * expected to only be called from the timer that is scheduled at a fixed delay, assuring that + * only one timer is active at a time. + */ + void swapCounters() { + inactiveCallCounter.reset(); + CallCounter tempCounter = activeCallCounter; + activeCallCounter = inactiveCallCounter; + inactiveCallCounter = tempCounter; + } + + void ejectSubchannels(long ejectionTimeNanos) { + this.ejectionTimeNanos = ejectionTimeNanos; + ejectionTimeMultiplier++; + for (OutlierDetectionSubchannel subchannel : subchannels) { + subchannel.eject(); + } + } + + /** + * Uneject a currently ejected address. + */ + void unejectSubchannels() { + checkState(ejectionTimeNanos != null, "not currently ejected"); + ejectionTimeNanos = null; + for (OutlierDetectionSubchannel subchannel : subchannels) { + subchannel.uneject(); + } + } + + boolean subchannelsEjected() { + return ejectionTimeNanos != null; + } + + public boolean maxEjectionTimeElapsed(long currentTimeNanos) { + // The instant in time beyond which the address should no longer be ejected. Also making sure + // we honor any maximum ejection time setting. + long maxEjectionDurationSecs + = Math.max(config.baseEjectionTimeNanos, config.maxEjectionTimeNanos); + long maxEjectionTimeNanos = + ejectionTimeNanos + Math.min( + config.baseEjectionTimeNanos * ejectionTimeMultiplier, + maxEjectionDurationSecs); + + return currentTimeNanos > maxEjectionTimeNanos; + } + + /** Tracks both successful and failed call counts. */ + private static class CallCounter { + AtomicLong successCount = new AtomicLong(); + AtomicLong failureCount = new AtomicLong(); + + void reset() { + successCount.set(0); + failureCount.set(0); + } + } + } + + /** + * Maintains a mapping from addresses to their trackers. + */ + static class AddressTrackerMap extends ForwardingMap { + private final Map trackerMap; + + AddressTrackerMap() { + trackerMap = new HashMap<>(); + } + + @Override + protected Map delegate() { + return trackerMap; + } + + void updateTrackerConfigs(OutlierDetectionLoadBalancerConfig config) { + for (AddressTracker tracker: trackerMap.values()) { + tracker.setConfig(config); + } + } + + /** Adds a new tracker for every given address. */ + void putNewTrackers(OutlierDetectionLoadBalancerConfig config, + Collection addresses) { + for (SocketAddress address : addresses) { + if (!trackerMap.containsKey(address)) { + trackerMap.put(address, new AddressTracker(config)); + } + } + } + + /** Resets the call counters for all the trackers in the map. */ + void resetCallCounters() { + for (AddressTracker tracker : trackerMap.values()) { + tracker.resetCallCounters(); + } + } + + /** + * When OD gets disabled we need to uneject any subchannels that may have been ejected and + * to reset the ejection time multiplier. + */ + void cancelTracking() { + for (AddressTracker tracker : trackerMap.values()) { + if (tracker.subchannelsEjected()) { + tracker.unejectSubchannels(); + } + tracker.resetEjectionTimeMultiplier(); + } + } + + /** Swaps the active and inactive counters for each tracker. */ + void swapCounters() { + for (AddressTracker tracker : trackerMap.values()) { + tracker.swapCounters(); + } + } + + /** + * At the end of a timer run we need to decrement the ejection time multiplier for trackers + * that don't have ejected subchannels and uneject ones that have spent the maximum ejection + * time allowed. + */ + void maybeUnejectOutliers(Long detectionTimerStartNanos) { + for (AddressTracker tracker : trackerMap.values()) { + if (!tracker.subchannelsEjected()) { + tracker.decrementEjectionTimeMultiplier(); + } + + if (tracker.subchannelsEjected() && tracker.maxEjectionTimeElapsed( + detectionTimerStartNanos)) { + tracker.unejectSubchannels(); + } + } + } + + /** + * How many percent of the addresses would have their subchannels ejected if we proceeded + * with the next ejection. + */ + double nextEjectionPercentage() { + if (trackerMap.isEmpty()) { + return 0; + } + int totalAddresses = 0; + int ejectedAddresses = 0; + for (AddressTracker tracker : trackerMap.values()) { + totalAddresses++; + if (tracker.subchannelsEjected()) { + ejectedAddresses++; + } + } + return ((double)(ejectedAddresses + 1) / totalAddresses) * 100; + } + } + + + /** + * Implementations provide different ways of ejecting outlier addresses.. + */ + interface OutlierEjectionAlgorithm { + + /** Eject any outlier addresses. */ + void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos); + + /** Builds a list of algorithms that are enabled in the given config. */ + @Nullable + static List forConfig(OutlierDetectionLoadBalancerConfig config) { + ImmutableList.Builder algoListBuilder = ImmutableList.builder(); + if (config.successRateEjection != null) { + algoListBuilder.add(new SuccessRateOutlierEjectionAlgorithm(config)); + } + if (config.failurePercentageEjection != null) { + algoListBuilder.add(new FailurePercentageOutlierEjectionAlgorithm(config)); + } + return algoListBuilder.build(); + } + } + + /** + * This algorithm ejects addresses that don't maintain a required rate of successful calls. The + * required rate is not fixed, but is based on the mean and standard deviation of the success + * rates of all of the addresses. + */ + static class SuccessRateOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm { + + private final OutlierDetectionLoadBalancerConfig config; + + SuccessRateOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) { + checkArgument(config.successRateEjection != null, "success rate ejection config is null"); + this.config = config; + } + + @Override + public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) { + + // Only consider addresses that have the minimum request volume specified in the config. + List trackersWithVolume = trackersWithVolume(trackerMap, config); + // If we don't have enough addresses with significant volume then there's nothing to do. + if (trackersWithVolume.size() < config.successRateEjection.minimumHosts + || trackersWithVolume.size() == 0) { + return; + } + + // Calculate mean and standard deviation of the fractions of successful calls. + List successRates = new ArrayList<>(); + for (AddressTracker tracker : trackersWithVolume) { + successRates.add(tracker.successRate()); + } + double mean = mean(successRates); + double stdev = standardDeviation(successRates, mean); + + double requiredSuccessRate = + mean - stdev * (config.successRateEjection.stdevFactor / 1000f); + + for (AddressTracker tracker : trackersWithVolume) { + // If an ejection now would take us past the max configured ejection percentage, stop here. + if (trackerMap.nextEjectionPercentage() > config.maxEjectionPercent) { + return; + } + + // If success rate is below the threshold, eject the address. + if (tracker.successRate() < requiredSuccessRate) { + // Only eject some addresses based on the enforcement percentage. + if (new Random().nextInt(100) < config.successRateEjection.enforcementPercentage) { + tracker.ejectSubchannels(ejectionTimeNanos); + } + } + } + } + + /** Returns only the trackers that have the minimum configured volume to be considered. */ + private List trackersWithVolume(AddressTrackerMap trackerMap, + OutlierDetectionLoadBalancerConfig config) { + List trackersWithVolume = new ArrayList<>(); + for (AddressTracker tracker : trackerMap.values()) { + if (tracker.inactiveVolume() >= config.successRateEjection.requestVolume) { + trackersWithVolume.add(tracker); + } + } + return trackersWithVolume; + } + + /** Calculates the mean of the given values. */ + @VisibleForTesting + static double mean(Collection values) { + double totalValue = 0; + for (double value : values) { + totalValue += value; + } + + return totalValue / values.size(); + } + + /** Calculates the standard deviation for the given values and their mean. */ + @VisibleForTesting + static double standardDeviation(Collection values, double mean) { + double squaredDifferenceSum = 0; + for (double value : values) { + double difference = value - mean; + squaredDifferenceSum += difference * difference; + } + double variance = squaredDifferenceSum / values.size(); + + return Math.sqrt(variance); + } + } + + static class FailurePercentageOutlierEjectionAlgorithm implements OutlierEjectionAlgorithm { + + private final OutlierDetectionLoadBalancerConfig config; + + FailurePercentageOutlierEjectionAlgorithm(OutlierDetectionLoadBalancerConfig config) { + this.config = config; + } + + @Override + public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeNanos) { + + // If we don't have the minimum amount of addresses the config calls for, then return. + if (trackerMap.size() < config.failurePercentageEjection.minimumHosts) { + return; + } + + // If this address does not have enough volume to be considered, skip to the next one. + for (AddressTracker tracker : trackerMap.values()) { + // If an ejection now would take us past the max configured ejection percentage stop here. + if (trackerMap.nextEjectionPercentage() > config.maxEjectionPercent) { + return; + } + + if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) { + continue; + } + + // If the failure rate is above the threshold, we should eject... + double maxFailureRate = ((double)config.failurePercentageEjection.threshold) / 100; + if (tracker.failureRate() > maxFailureRate) { + // ...but only enforce this based on the enforcement percentage. + if (new Random().nextInt(100) < config.failurePercentageEjection.enforcementPercentage) { + tracker.ejectSubchannels(ejectionTimeNanos); + } + } + } + } + } + + /** Counts how many addresses are in a given address group. */ + private static boolean hasSingleAddress(List addressGroups) { + int addressCount = 0; + for (EquivalentAddressGroup addressGroup : addressGroups) { + addressCount += addressGroup.getAddresses().size(); + if (addressCount > 1) { + return false; + } + } + return true; + } + + /** + * The configuration for {@link OutlierDetectionLoadBalancer}. + */ + public static final class OutlierDetectionLoadBalancerConfig { + + final Long intervalNanos; + final Long baseEjectionTimeNanos; + final Long maxEjectionTimeNanos; + final Integer maxEjectionPercent; + final SuccessRateEjection successRateEjection; + final FailurePercentageEjection failurePercentageEjection; + final PolicySelection childPolicy; + + private OutlierDetectionLoadBalancerConfig(Long intervalNanos, + Long baseEjectionTimeNanos, + Long maxEjectionTimeNanos, + Integer maxEjectionPercent, + SuccessRateEjection successRateEjection, + FailurePercentageEjection failurePercentageEjection, + PolicySelection childPolicy) { + this.intervalNanos = intervalNanos; + this.baseEjectionTimeNanos = baseEjectionTimeNanos; + this.maxEjectionTimeNanos = maxEjectionTimeNanos; + this.maxEjectionPercent = maxEjectionPercent; + this.successRateEjection = successRateEjection; + this.failurePercentageEjection = failurePercentageEjection; + this.childPolicy = childPolicy; + } + + /** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */ + public static class Builder { + Long intervalNanos = 10_000_000_000L; // 10s + Long baseEjectionTimeNanos = 30_000_000_000L; // 30s + Long maxEjectionTimeNanos = 30_000_000_000L; // 30s + Integer maxEjectionPercent = 10; + SuccessRateEjection successRateEjection; + FailurePercentageEjection failurePercentageEjection; + PolicySelection childPolicy; + + /** The interval between outlier detection sweeps. */ + public Builder setIntervalNanos(Long intervalNanos) { + checkArgument(intervalNanos != null); + this.intervalNanos = intervalNanos; + return this; + } + + /** The base time an address is ejected for. */ + public Builder setBaseEjectionTimeNanos(Long baseEjectionTimeNanos) { + checkArgument(baseEjectionTimeNanos != null); + this.baseEjectionTimeNanos = baseEjectionTimeNanos; + return this; + } + + /** The longest time an address can be ejected. */ + public Builder setMaxEjectionTimeNanos(Long maxEjectionTimeNanos) { + checkArgument(maxEjectionTimeNanos != null); + this.maxEjectionTimeNanos = maxEjectionTimeNanos; + return this; + } + + /** The algorithm agnostic maximum percentage of addresses that can be ejected. */ + public Builder setMaxEjectionPercent(Integer maxEjectionPercent) { + checkArgument(maxEjectionPercent != null); + this.maxEjectionPercent = maxEjectionPercent; + return this; + } + + /** Set to enable success rate ejection. */ + public Builder setSuccessRateEjection( + SuccessRateEjection successRateEjection) { + this.successRateEjection = successRateEjection; + return this; + } + + /** Set to enable failure percentage ejection. */ + public Builder setFailurePercentageEjection( + FailurePercentageEjection failurePercentageEjection) { + this.failurePercentageEjection = failurePercentageEjection; + return this; + } + + /** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */ + public Builder setChildPolicy(PolicySelection childPolicy) { + checkState(childPolicy != null); + this.childPolicy = childPolicy; + return this; + } + + /** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */ + public OutlierDetectionLoadBalancerConfig build() { + checkState(childPolicy != null); + return new OutlierDetectionLoadBalancerConfig(intervalNanos, baseEjectionTimeNanos, + maxEjectionTimeNanos, maxEjectionPercent, successRateEjection, + failurePercentageEjection, childPolicy); + } + } + + /** The configuration for success rate ejection. */ + public static class SuccessRateEjection { + + final Integer stdevFactor; + final Integer enforcementPercentage; + final Integer minimumHosts; + final Integer requestVolume; + + SuccessRateEjection(Integer stdevFactor, Integer enforcementPercentage, Integer minimumHosts, + Integer requestVolume) { + this.stdevFactor = stdevFactor; + this.enforcementPercentage = enforcementPercentage; + this.minimumHosts = minimumHosts; + this.requestVolume = requestVolume; + } + + /** Builds new instances of {@link SuccessRateEjection}. */ + public static final class Builder { + + Integer stdevFactor = 1900; + Integer enforcementPercentage = 100; + Integer minimumHosts = 5; + Integer requestVolume = 100; + + /** The product of this and the standard deviation of success rates determine the ejection + * threshold. + */ + public Builder setStdevFactor(Integer stdevFactor) { + checkArgument(stdevFactor != null); + this.stdevFactor = stdevFactor; + return this; + } + + /** Only eject this percentage of outliers. */ + public Builder setEnforcementPercentage(Integer enforcementPercentage) { + checkArgument(enforcementPercentage != null); + checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100); + this.enforcementPercentage = enforcementPercentage; + return this; + } + + /** The minimum amount of hosts needed for success rate ejection. */ + public Builder setMinimumHosts(Integer minimumHosts) { + checkArgument(minimumHosts != null); + checkArgument(minimumHosts >= 0); + this.minimumHosts = minimumHosts; + return this; + } + + /** The minimum address request volume to be considered for success rate ejection. */ + public Builder setRequestVolume(Integer requestVolume) { + checkArgument(requestVolume != null); + checkArgument(requestVolume >= 0); + this.requestVolume = requestVolume; + return this; + } + + /** Builds a new instance of {@link SuccessRateEjection}. */ + public SuccessRateEjection build() { + return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts, + requestVolume); + } + } + } + + /** The configuration for failure percentage ejection. */ + public static class FailurePercentageEjection { + final Integer threshold; + final Integer enforcementPercentage; + final Integer minimumHosts; + final Integer requestVolume; + + FailurePercentageEjection(Integer threshold, Integer enforcementPercentage, + Integer minimumHosts, Integer requestVolume) { + this.threshold = threshold; + this.enforcementPercentage = enforcementPercentage; + this.minimumHosts = minimumHosts; + this.requestVolume = requestVolume; + } + + /** For building new {@link FailurePercentageEjection} instances. */ + public static class Builder { + Integer threshold = 85; + Integer enforcementPercentage = 100; + Integer minimumHosts = 5; + Integer requestVolume = 50; + + /** The failure percentage that will result in an address being considered an outlier. */ + public Builder setThreshold(Integer threshold) { + checkArgument(threshold != null); + checkArgument(threshold >= 0 && threshold <= 100); + this.threshold = threshold; + return this; + } + + /** Only eject this percentage of outliers. */ + public Builder setEnforcementPercentage(Integer enforcementPercentage) { + checkArgument(enforcementPercentage != null); + checkArgument(enforcementPercentage >= 0 && enforcementPercentage <= 100); + this.enforcementPercentage = enforcementPercentage; + return this; + } + + /** The minimum amount of host for failure percentage ejection to be enabled. */ + public Builder setMinimumHosts(Integer minimumHosts) { + checkArgument(minimumHosts != null); + checkArgument(minimumHosts >= 0); + this.minimumHosts = minimumHosts; + return this; + } + + /** + * The request volume required for an address to be considered for failure percentage + * ejection. + */ + public Builder setRequestVolume(Integer requestVolume) { + checkArgument(requestVolume != null); + checkArgument(requestVolume >= 0); + this.requestVolume = requestVolume; + return this; + } + + /** Builds a new instance of {@link FailurePercentageEjection}. */ + public FailurePercentageEjection build() { + return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts, + requestVolume); + } + } + } + + /** Determine if any outlier detection algorithms are enabled in the config. */ + boolean outlierDetectionEnabled() { + return successRateEjection != null || failurePercentageEjection != null; + } + } +} diff --git a/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancerProvider.java b/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancerProvider.java new file mode 100644 index 00000000000..a92f49bd1d2 --- /dev/null +++ b/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancerProvider.java @@ -0,0 +1,158 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.internal.TimeProvider; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.FailurePercentageEjection; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.SuccessRateEjection; +import java.util.List; +import java.util.Map; + +public final class OutlierDetectionLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new OutlierDetectionLoadBalancer(helper, TimeProvider.SYSTEM_TIME_PROVIDER); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return "outlier_detection_experimental"; + } + + @Override + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) { + // Common configuration. + Long intervalNanos = JsonUtil.getStringAsDuration(rawConfig, "interval"); + Long baseEjectionTimeNanos = JsonUtil.getStringAsDuration(rawConfig, "baseEjectionTime"); + Long maxEjectionTimeNanos = JsonUtil.getStringAsDuration(rawConfig, "maxEjectionTime"); + Integer maxEjectionPercentage = JsonUtil.getNumberAsInteger(rawConfig, + "maxEjectionPercentage"); + + OutlierDetectionLoadBalancerConfig.Builder configBuilder + = new OutlierDetectionLoadBalancerConfig.Builder(); + if (intervalNanos != null) { + configBuilder.setIntervalNanos(intervalNanos); + } + if (baseEjectionTimeNanos != null) { + configBuilder.setBaseEjectionTimeNanos(baseEjectionTimeNanos); + } + if (maxEjectionTimeNanos != null) { + configBuilder.setMaxEjectionTimeNanos(maxEjectionTimeNanos); + } + if (maxEjectionPercentage != null) { + configBuilder.setMaxEjectionPercent(maxEjectionPercentage); + } + + // Success rate ejection specific configuration. + Map rawSuccessRateEjection = JsonUtil.getObject(rawConfig, "successRateEjection"); + if (rawSuccessRateEjection != null) { + SuccessRateEjection.Builder successRateEjectionBuilder = new SuccessRateEjection.Builder(); + + Integer stdevFactor = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, "stdevFactor"); + Integer enforcementPercentage = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, + "enforcementPercentage"); + Integer minimumHosts = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, "minimumHosts"); + Integer requestVolume = JsonUtil.getNumberAsInteger(rawSuccessRateEjection, "requestVolume"); + + if (stdevFactor != null) { + successRateEjectionBuilder.setStdevFactor(stdevFactor); + } + if (enforcementPercentage != null) { + successRateEjectionBuilder.setEnforcementPercentage(enforcementPercentage); + } + if (minimumHosts != null) { + successRateEjectionBuilder.setMinimumHosts(minimumHosts); + } + if (requestVolume != null) { + successRateEjectionBuilder.setRequestVolume(requestVolume); + } + + configBuilder.setSuccessRateEjection(successRateEjectionBuilder.build()); + } + + // Failure percentage ejection specific configuration. + Map rawFailurePercentageEjection = JsonUtil.getObject(rawConfig, + "failurePercentageEjection"); + if (rawFailurePercentageEjection != null) { + FailurePercentageEjection.Builder failurePercentageEjectionBuilder + = new FailurePercentageEjection.Builder(); + + Integer threshold = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection, "threshold"); + Integer enforcementPercentage = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection, + "enforcementPercentage"); + Integer minimumHosts = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection, + "minimumHosts"); + Integer requestVolume = JsonUtil.getNumberAsInteger(rawFailurePercentageEjection, + "requestVolume"); + + if (threshold != null) { + failurePercentageEjectionBuilder.setThreshold(threshold); + } + if (enforcementPercentage != null) { + failurePercentageEjectionBuilder.setEnforcementPercentage(enforcementPercentage); + } + if (minimumHosts != null) { + failurePercentageEjectionBuilder.setMinimumHosts(minimumHosts); + } + if (requestVolume != null) { + failurePercentageEjectionBuilder.setRequestVolume(requestVolume); + } + + configBuilder.setFailurePercentageEjection(failurePercentageEjectionBuilder.build()); + } + + // Child load balancer configuration. + List childConfigCandidates = ServiceConfigUtil.unwrapLoadBalancingConfigList( + JsonUtil.getListOfObjects(rawConfig, "childPolicy")); + if (childConfigCandidates == null || childConfigCandidates.isEmpty()) { + return ConfigOrError.fromError(Status.INTERNAL.withDescription( + "No child policy in outlier_detection_experimental LB policy: " + + rawConfig)); + } + ConfigOrError selectedConfig = + ServiceConfigUtil.selectLbPolicyFromList(childConfigCandidates, + LoadBalancerRegistry.getDefaultRegistry()); + if (selectedConfig.getError() != null) { + return selectedConfig; + } + configBuilder.setChildPolicy((PolicySelection) selectedConfig.getConfig()); + + return ConfigOrError.fromConfig(configBuilder.build()); + } +} diff --git a/core/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/core/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider index cb200d5f044..d68a57c4eb3 100644 --- a/core/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider +++ b/core/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider @@ -1,2 +1,3 @@ io.grpc.internal.PickFirstLoadBalancerProvider io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider +io.grpc.util.OutlierDetectionLoadBalancerProvider diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index f5d22651271..9cc9178f1ff 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -159,8 +159,10 @@ private void schedule(ScheduledTask task, long delay, TimeUnit unit) { } @Override public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - throw new UnsupportedOperationException(); + Runnable cmd, long initialDelay, long delay, TimeUnit unit) { + ScheduledTask task = new ScheduleWithFixedDelayTask(cmd, delay, unit); + schedule(task, initialDelay, unit); + return task; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) { @@ -234,6 +236,24 @@ public ScheduleAtFixedRateTask(Runnable command, long period, TimeUnit unit) { } } } + + class ScheduleWithFixedDelayTask extends ScheduledTask { + + final long delayNanos; + + ScheduleWithFixedDelayTask(Runnable command, long delay, TimeUnit unit) { + super(command); + this.delayNanos = unit.toNanos(delay); + } + + @Override + void run() { + command.run(); + if (!isCancelled()) { + schedule(this, delayNanos, TimeUnit.NANOSECONDS); + } + } + } } /** diff --git a/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerProviderTest.java b/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerProviderTest.java new file mode 100644 index 00000000000..5a27e6f176f --- /dev/null +++ b/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerProviderTest.java @@ -0,0 +1,141 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.grpc.InternalServiceProviders; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.SynchronizationContext; +import io.grpc.internal.JsonParser; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link OutlierDetectionLoadBalancerProvider}. + */ +@RunWith(JUnit4.class) +public class OutlierDetectionLoadBalancerProviderTest { + + private final SynchronizationContext syncContext = new SynchronizationContext( + new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final OutlierDetectionLoadBalancerProvider provider + = new OutlierDetectionLoadBalancerProvider(); + + @Test + public void provided() { + for (LoadBalancerProvider current : InternalServiceProviders.getCandidatesViaServiceLoader( + LoadBalancerProvider.class, getClass().getClassLoader())) { + if (current instanceof OutlierDetectionLoadBalancerProvider) { + return; + } + } + fail("OutlierDetectionLoadBalancerProvider not registered"); + } + + @Test + public void providesLoadBalancer() { + Helper helper = mock(Helper.class); + when(helper.getSynchronizationContext()).thenReturn(syncContext); + when(helper.getScheduledExecutorService()).thenReturn(mock(ScheduledExecutorService.class)); + assertThat(provider.newLoadBalancer(helper)) + .isInstanceOf(OutlierDetectionLoadBalancer.class); + } + + @Test + public void parseLoadBalancingConfig_defaults() throws IOException { + String lbConfig = + "{ \"successRateEjection\" : {}, " + + "\"failurePercentageEjection\" : {}, " + + "\"childPolicy\" : [{\"round_robin\" : {}}]}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getConfig()).isNotNull(); + OutlierDetectionLoadBalancerConfig config + = (OutlierDetectionLoadBalancerConfig) configOrError.getConfig(); + assertThat(config.successRateEjection).isNotNull(); + assertThat(config.failurePercentageEjection).isNotNull(); + assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin"); + } + + @Test + public void parseLoadBalancingConfig_valuesSet() throws IOException { + String lbConfig = + "{\"interval\" : \"100s\"," + + " \"baseEjectionTime\" : \"100s\"," + + " \"maxEjectionTime\" : \"100s\"," + + " \"maxEjectionPercentage\" : 100," + + " \"successRateEjection\" : {" + + " \"stdevFactor\" : 100," + + " \"enforcementPercentage\" : 100," + + " \"minimumHosts\" : 100," + + " \"requestVolume\" : 100" + + " }," + + " \"failurePercentageEjection\" : {" + + " \"threshold\" : 100," + + " \"enforcementPercentage\" : 100," + + " \"minimumHosts\" : 100," + + " \"requestVolume\" : 100" + + " }," + + "\"childPolicy\" : [{\"round_robin\" : {}}]}"; + ConfigOrError configOrError = + provider.parseLoadBalancingPolicyConfig(parseJsonObject(lbConfig)); + assertThat(configOrError.getConfig()).isNotNull(); + OutlierDetectionLoadBalancerConfig config + = (OutlierDetectionLoadBalancerConfig) configOrError.getConfig(); + + assertThat(config.intervalNanos).isEqualTo(100_000_000_000L); + assertThat(config.baseEjectionTimeNanos).isEqualTo(100_000_000_000L); + assertThat(config.maxEjectionTimeNanos).isEqualTo(100_000_000_000L); + assertThat(config.maxEjectionPercent).isEqualTo(100); + + assertThat(config.successRateEjection).isNotNull(); + assertThat(config.successRateEjection.stdevFactor).isEqualTo(100); + assertThat(config.successRateEjection.enforcementPercentage).isEqualTo(100); + assertThat(config.successRateEjection.minimumHosts).isEqualTo(100); + assertThat(config.successRateEjection.requestVolume).isEqualTo(100); + + assertThat(config.failurePercentageEjection).isNotNull(); + assertThat(config.failurePercentageEjection.threshold).isEqualTo(100); + assertThat(config.failurePercentageEjection.enforcementPercentage).isEqualTo(100); + assertThat(config.failurePercentageEjection.minimumHosts).isEqualTo(100); + assertThat(config.failurePercentageEjection.requestVolume).isEqualTo(100); + + assertThat(config.childPolicy.getProvider().getPolicyName()).isEqualTo("round_robin"); + } + + @SuppressWarnings("unchecked") + private static Map parseJsonObject(String json) throws IOException { + return (Map) JsonParser.parse(json); + } +} diff --git a/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java new file mode 100644 index 00000000000..5b73edb4c2c --- /dev/null +++ b/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -0,0 +1,1077 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static io.grpc.ConnectivityState.READY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.grpc.ClientStreamTracer; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.LoadBalancerProvider; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.internal.FakeClock; +import io.grpc.internal.FakeClock.ScheduledTask; +import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; +import io.grpc.util.OutlierDetectionLoadBalancer.AddressTracker; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.FailurePercentageEjection; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig.SuccessRateEjection; +import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionSubchannel; +import io.grpc.util.OutlierDetectionLoadBalancer.SuccessRateOutlierEjectionAlgorithm; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +/** + * Unit tests for {@link OutlierDetectionLoadBalancer}. + */ +@RunWith(JUnit4.class) +public class OutlierDetectionLoadBalancerTest { + + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock + private LoadBalancer mockChildLb; + @Mock + private Helper mockHelper; + @Mock + private SocketAddress mockSocketAddress; + + @Captor + private ArgumentCaptor connectivityStateCaptor; + @Captor + private ArgumentCaptor errorPickerCaptor; + @Captor + private ArgumentCaptor pickerCaptor; + @Captor + private ArgumentCaptor stateCaptor; + + private final LoadBalancerProvider mockChildLbProvider = new StandardLoadBalancerProvider( + "foo_policy") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return mockChildLb; + } + }; + private final LoadBalancerProvider roundRobinLbProvider = new StandardLoadBalancerProvider( + "round_robin") { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return new RoundRobinLoadBalancer(helper); + } + }; + + private final FakeClock fakeClock = new FakeClock(); + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private OutlierDetectionLoadBalancer loadBalancer; + + private final List servers = Lists.newArrayList(); + private final Map, Subchannel> subchannels = Maps.newLinkedHashMap(); + private final Map subchannelStateListeners + = Maps.newLinkedHashMap(); + + private Subchannel subchannel1; + private Subchannel subchannel2; + private Subchannel subchannel3; + private Subchannel subchannel4; + private Subchannel subchannel5; + + @Before + public void setUp() { + for (int i = 0; i < 5; i++) { + SocketAddress addr = new FakeSocketAddress("server" + i); + EquivalentAddressGroup eag = new EquivalentAddressGroup(addr); + servers.add(eag); + Subchannel sc = mock(Subchannel.class); + subchannels.put(Arrays.asList(eag), sc); + } + + Iterator subchannelIterator = subchannels.values().iterator(); + subchannel1 = subchannelIterator.next(); + subchannel2 = subchannelIterator.next(); + subchannel3 = subchannelIterator.next(); + subchannel4 = subchannelIterator.next(); + subchannel5 = subchannelIterator.next(); + + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + when(mockHelper.getScheduledExecutorService()).thenReturn( + fakeClock.getScheduledExecutorService()); + when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))).then( + new Answer() { + @Override + public Subchannel answer(InvocationOnMock invocation) throws Throwable { + CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0]; + final Subchannel subchannel = subchannels.get(args.getAddresses()); + when(subchannel.getAllAddresses()).thenReturn(args.getAddresses()); + when(subchannel.getAttributes()).thenReturn(args.getAttributes()); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + subchannelStateListeners.put(subchannel, + (SubchannelStateListener) invocation.getArguments()[0]); + return null; + } + }).when(subchannel).start(any(SubchannelStateListener.class)); + return subchannel; + } + }); + + loadBalancer = new OutlierDetectionLoadBalancer(mockHelper, fakeClock.getTimeProvider()); + } + + @Test + public void handleNameResolutionError_noChildLb() { + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + + verify(mockHelper).updateBalancingState(connectivityStateCaptor.capture(), + errorPickerCaptor.capture()); + assertThat(connectivityStateCaptor.getValue()).isEqualTo(ConnectivityState.TRANSIENT_FAILURE); + } + + @Test + public void handleNameResolutionError_withChildLb() { + loadBalancer.handleResolvedAddresses(buildResolvedAddress( + new OutlierDetectionLoadBalancerConfig.Builder() + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)).build(), + new EquivalentAddressGroup(mockSocketAddress))); + loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED); + + verify(mockChildLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED); + } + + /** + * {@code shutdown()} is simply delegated. + */ + @Test + public void shutdown() { + loadBalancer.handleResolvedAddresses(buildResolvedAddress( + new OutlierDetectionLoadBalancerConfig.Builder() + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)).build(), + new EquivalentAddressGroup(mockSocketAddress))); + loadBalancer.shutdown(); + verify(mockChildLb).shutdown(); + } + + /** + * Base case for accepting new resolved addresses. + */ + @Test + public void handleResolvedAddresses() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)).build(); + ResolvedAddresses resolvedAddresses = buildResolvedAddress(config, + new EquivalentAddressGroup(mockSocketAddress)); + + loadBalancer.handleResolvedAddresses(resolvedAddresses); + + // Handling of resolved addresses is delegated + verify(mockChildLb).handleResolvedAddresses(resolvedAddresses); + + // There is a single pending task to run the outlier detection algorithm + assertThat(fakeClock.getPendingTasks()).hasSize(1); + + // The task is scheduled to run after a delay set in the config. + ScheduledTask task = fakeClock.getPendingTasks().iterator().next(); + assertThat(task.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(config.intervalNanos); + } + + /** + * Outlier detection first enabled, then removed. + */ + @Test + public void handleResolvedAddresses_outlierDetectionDisabled() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)).build(); + ResolvedAddresses resolvedAddresses = buildResolvedAddress(config, + new EquivalentAddressGroup(mockSocketAddress)); + + loadBalancer.handleResolvedAddresses(resolvedAddresses); + + fakeClock.forwardTime(15, TimeUnit.SECONDS); + + // There is a single pending task to run the outlier detection algorithm + assertThat(fakeClock.getPendingTasks()).hasSize(1); + + config = new OutlierDetectionLoadBalancerConfig.Builder().setChildPolicy( + new PolicySelection(mockChildLbProvider, null)).build(); + loadBalancer.handleResolvedAddresses( + buildResolvedAddress(config, new EquivalentAddressGroup(mockSocketAddress))); + + // Pending task should be gone since OD is disabled. + assertThat(fakeClock.getPendingTasks()).isEmpty(); + + } + + /** + * Tests different scenarios when the timer interval in the config changes. + */ + @Test + public void handleResolvedAddresses_intervalUpdate() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)).build(); + ResolvedAddresses resolvedAddresses = buildResolvedAddress(config, + new EquivalentAddressGroup(mockSocketAddress)); + + loadBalancer.handleResolvedAddresses(resolvedAddresses); + + // Config update has doubled the interval + config = new OutlierDetectionLoadBalancerConfig.Builder() + .setIntervalNanos(config.intervalNanos * 2) + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(mockChildLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses( + buildResolvedAddress(config, new EquivalentAddressGroup(mockSocketAddress))); + + // If the timer has not run yet the task is just rescheduled to run after the new delay. + assertThat(fakeClock.getPendingTasks()).hasSize(1); + ScheduledTask task = fakeClock.getPendingTasks().iterator().next(); + assertThat(task.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(config.intervalNanos); + assertThat(task.dueTimeNanos).isEqualTo(config.intervalNanos); + + // The new interval time has passed. The next task due time should have been pushed back another + // interval. + forwardTime(config); + assertThat(fakeClock.getPendingTasks()).hasSize(1); + task = fakeClock.getPendingTasks().iterator().next(); + assertThat(task.dueTimeNanos).isEqualTo(config.intervalNanos + config.intervalNanos + 1); + + // Some time passes and a second update comes down, but now the timer has had a chance to run, + // the new delay to timer start should consider when the timer last ran and if the interval is + // not changing in the config, the next task due time should remain unchanged. + fakeClock.forwardTime(4, TimeUnit.SECONDS); + task = fakeClock.getPendingTasks().iterator().next(); + loadBalancer.handleResolvedAddresses( + buildResolvedAddress(config, new EquivalentAddressGroup(mockSocketAddress))); + assertThat(task.dueTimeNanos).isEqualTo(config.intervalNanos + config.intervalNanos + 1); + } + + /** + * Confirm basic picking works by delegating to round_robin. + */ + @Test + public void delegatePick() throws Exception { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setSuccessRateEjection(new SuccessRateEjection.Builder().build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers.get(0))); + + // Make one of the subchannels READY. + final Subchannel readySubchannel = subchannels.values().iterator().next(); + deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY)); + + verify(mockHelper, times(3)).updateBalancingState(stateCaptor.capture(), + pickerCaptor.capture()); + + // Make sure that we can pick the single READY subchannel. + SubchannelPicker picker = pickerCaptor.getAllValues().get(2); + PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(((OutlierDetectionSubchannel) pickResult.getSubchannel()).delegate()).isEqualTo( + readySubchannel); + } + + /** + * The success rate algorithm leaves a healthy set of addresses alone. + */ + @Test + public void successRateNoOutliers() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder().setMinimumHosts(3).setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // No outliers, no ejections. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * The success rate algorithm ejects the outlier. + */ + @Test + public void successRateOneOutlier() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + } + + /** + * The success rate algorithm ejects the outlier, but then the config changes so that similar + * behavior no longer gets ejected. + */ + @Test + public void successRateOneOutlier_configChange() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + + // New config sets enforcement percentage to 0. + config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setEnforcementPercentage(0).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // Since we brought enforcement percentage to 0, no additional ejection should have happened. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + } + + /** + * The success rate algorithm ejects the outlier but after some time it should get unejected + * if it stops being an outlier.. + */ + @Test + public void successRateOneOutlier_unejected() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.intervalNanos + 1, TimeUnit.NANOSECONDS); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + + // Now we produce more load, but the subchannel start working and is no longer an outlier. + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.maxEjectionTimeNanos + 1, TimeUnit.NANOSECONDS); + + // No subchannels should remain ejected. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * The success rate algorithm ignores addresses without enough volume. + */ + @Test + public void successRateOneOutlier_notEnoughVolume() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(20).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + // We produce an outlier, but don't give it enough calls to reach the minimum volume. + generateLoad( + ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED), + ImmutableMap.of(subchannel1, 19)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The address should not have been ejected. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * The success rate algorithm does not apply if enough addresses have the required volume. + */ + @Test + public void successRateOneOutlier_notEnoughAddressesWithVolume() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(6) // We don't have this many hosts... + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // No subchannels should have been ejected. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * The enforcementPercentage configuration should be honored. + */ + @Test + public void successRateOneOutlier_enforcementPercentage() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setEnforcementPercentage(0) + .build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // There is one outlier, but because enforcementPercentage is 0, nothing should be ejected. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * Two outliers get ejected. + */ + @Test + public void successRateTwoOutliers() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setStdevFactor(1).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of( + subchannel1, Status.DEADLINE_EXCEEDED, + subchannel2, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0), + servers.get(1).getAddresses().get(0))); + } + + /** + * Two outliers. but only one gets ejected because we have reached the max ejection percentage. + */ + @Test + public void successRateTwoOutliers_maxEjectionPercentage() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(20) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setStdevFactor(1).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of( + subchannel1, Status.DEADLINE_EXCEEDED, + subchannel2, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + int totalEjected = 0; + for (EquivalentAddressGroup addressGroup: servers) { + totalEjected += + loadBalancer.trackerMap.get(addressGroup.getAddresses().get(0)).subchannelsEjected() ? 1 + : 0; + } + + // Even if all subchannels were failing, we should have not ejected more than the configured + // maximum percentage. + assertThat((double) totalEjected / servers.size()).isAtMost( + (double) config.maxEjectionPercent / 100); + } + + + /** + * The success rate algorithm leaves a healthy set of addresses alone. + */ + @Test + public void failurePercentageNoOutliers() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + // By default all calls will return OK. + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // No outliers, no ejections. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * The success rate algorithm ejects the outlier. + */ + @Test + public void failurePercentageOneOutlier() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + } + + /** + * The failure percentage algorithm ignores addresses without enough volume.. + */ + @Test + public void failurePercentageOneOutlier_notEnoughVolume() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(100).build()) // We won't produce this much volume... + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // We should see no ejections. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * The enforcementPercentage configuration should be honored. + */ + @Test + public void failurePercentageOneOutlier_enforcementPercentage() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setEnforcementPercentage(0) + .build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // There is one outlier, but because enforcementPercentage is 0, nothing should be ejected. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** Success rate detects two outliers and error percentage three. */ + @Test + public void successRateAndFailurePercentageThreeOutliers() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(100) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setStdevFactor(1).build()) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setThreshold(0) + .setMinimumHosts(3) + .setRequestVolume(1) + .build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + // Three subchannels with problems, but one only has a single call that failed. + // This is not enough for success rate to catch, but failure percentage is + // configured with a 0 tolerance threshold. + generateLoad( + ImmutableMap.of( + subchannel1, Status.DEADLINE_EXCEEDED, + subchannel2, Status.DEADLINE_EXCEEDED, + subchannel3, Status.DEADLINE_EXCEEDED), + ImmutableMap.of(subchannel3, 1)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // Should see thee ejected, success rate cathes the first two, error percentage the + // same two plus the subchannel with the single failure. + assertEjectedSubchannels(ImmutableSet.of( + servers.get(0).getAddresses().get(0), + servers.get(1).getAddresses().get(0), + servers.get(2).getAddresses().get(0))); + } + + /** + * When the address a subchannel is associated with changes it should get tracked under the new + * address and its ejection state should match what the address has. + */ + @Test + public void subchannelUpdateAddress_singleReplaced() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + EquivalentAddressGroup oldAddressGroup = servers.get(0); + AddressTracker oldAddressTracker = loadBalancer.trackerMap.get( + oldAddressGroup.getAddresses().get(0)); + EquivalentAddressGroup newAddressGroup = servers.get(1); + AddressTracker newAddressTracker = loadBalancer.trackerMap.get( + newAddressGroup.getAddresses().get(0)); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); + + // The ejected subchannel gets updated with another address in the map that is not ejected + OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() + .iterator().next(); + subchannel.updateAddresses(ImmutableList.of(newAddressGroup)); + + // The replaced address should no longer have the subchannel associated with it. + assertThat(oldAddressTracker.getSubchannels()).doesNotContain(subchannel); + + // The new address should instead have the subchannel. + assertThat(newAddressTracker.getSubchannels()).contains(subchannel); + + // Since the new address is not ejected, the ejected subchannel moving over to it should also + // become unejected. + assertThat(subchannel.isEjected()).isFalse(); + } + + /** + * If a single address gets replaced by multiple, the subchannel becomes uneligible for outlier + * detection. + */ + @Test + public void subchannelUpdateAddress_singleReplacedWithMultiple() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + EquivalentAddressGroup oldAddressGroup = servers.get(0); + AddressTracker oldAddressTracker = loadBalancer.trackerMap.get( + oldAddressGroup.getAddresses().get(0)); + EquivalentAddressGroup newAddress1 = servers.get(1); + EquivalentAddressGroup newAddress2 = servers.get(2); + + OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() + .iterator().next(); + + // The subchannel gets updated with two new addresses + ImmutableList addressUpdate + = ImmutableList.of(newAddress1, newAddress2); + subchannel.updateAddresses(addressUpdate); + when(subchannel1.getAllAddresses()).thenReturn(addressUpdate); + + // The replaced address should no longer be tracked. + assertThat(oldAddressTracker.getSubchannels()).doesNotContain(subchannel); + + // The old tracker should also have its call counters cleared. + assertThat(oldAddressTracker.activeVolume()).isEqualTo(0); + assertThat(oldAddressTracker.inactiveVolume()).isEqualTo(0); + } + + /** + * A subchannel with multiple addresses will again become eligible for outlier detection if it + * receives an update with a single address. + * + *

TODO: Figure out how to test this scenario, round_robin does not support multiple addresses + * and fails the transition from multiple addresses to single. + */ + @Ignore + public void subchannelUpdateAddress_multipleReplacedWithSingle() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + EquivalentAddressGroup oldAddressGroup = servers.get(0); + AddressTracker oldAddressTracker = loadBalancer.trackerMap.get( + oldAddressGroup.getAddresses().get(0)); + EquivalentAddressGroup newAddressGroup1 = servers.get(1); + AddressTracker newAddressTracker1 = loadBalancer.trackerMap.get( + newAddressGroup1.getAddresses().get(0)); + EquivalentAddressGroup newAddressGroup2 = servers.get(2); + + // The old subchannel was returning errors and should be ejected. + assertEjectedSubchannels(ImmutableSet.of(oldAddressGroup.getAddresses().get(0))); + + OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() + .iterator().next(); + + // The subchannel gets updated with two new addresses + ImmutableList addressUpdate + = ImmutableList.of(newAddressGroup1, newAddressGroup2); + subchannel.updateAddresses(addressUpdate); + when(subchannel1.getAllAddresses()).thenReturn(addressUpdate); + + // The replaced address should no longer be tracked. + assertThat(oldAddressTracker.getSubchannels()).doesNotContain(subchannel); + + // The old tracker should also have its call counters cleared. + assertThat(oldAddressTracker.activeVolume()).isEqualTo(0); + assertThat(oldAddressTracker.inactiveVolume()).isEqualTo(0); + + // Another update takes the subchannel back to a single address. + addressUpdate = ImmutableList.of(newAddressGroup1); + subchannel.updateAddresses(addressUpdate); + when(subchannel1.getAllAddresses()).thenReturn(addressUpdate); + + // The subchannel is now associated with the single new address. + assertThat(newAddressTracker1.getSubchannels()).contains(subchannel); + + // The previously ejected subchannel should become unejected as it is now associated with an + // unejected address. + assertThat(subchannel.isEjected()).isFalse(); + } + + /** Both algorithms configured, but no outliers. */ + @Test + public void successRateAndFailurePercentage_noOutliers() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // No outliers, no ejections. + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** Both algorithms configured, success rate detects an outlier. */ + @Test + public void successRateAndFailurePercentage_successRateOutlier() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setEnforcementPercentage(0).build()) // Configured, but not enforcing. + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + } + + /** Both algorithms configured, error percentage detects an outlier. */ + @Test + public void successRateAndFailurePercentage_errorPercentageOutlier() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10) + .setEnforcementPercentage(0).build()) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) // Configured, but not enforcing. + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + forwardTime(config); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0).getAddresses().get(0))); + } + + @Test + public void mathChecksOut() { + ImmutableList values = ImmutableList.of(600d, 470d, 170d, 430d, 300d); + double mean = SuccessRateOutlierEjectionAlgorithm.mean(values); + double stdev = SuccessRateOutlierEjectionAlgorithm.standardDeviation(values, mean); + + assertThat(mean).isEqualTo(394); + assertThat(stdev).isEqualTo(147.32277488562318); + } + + private static class FakeSocketAddress extends SocketAddress { + + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + } + + private ResolvedAddresses buildResolvedAddress(OutlierDetectionLoadBalancerConfig config, + EquivalentAddressGroup... servers) { + return ResolvedAddresses.newBuilder().setAddresses(ImmutableList.copyOf(servers)) + .setLoadBalancingPolicyConfig(config).build(); + } + + private ResolvedAddresses buildResolvedAddress(OutlierDetectionLoadBalancerConfig config, + List servers) { + return ResolvedAddresses.newBuilder().setAddresses(ImmutableList.copyOf(servers)) + .setLoadBalancingPolicyConfig(config).build(); + } + + private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { + subchannelStateListeners.get(subchannel).onSubchannelState(newState); + } + + private void generateLoad(Map statusMap) { + generateLoad(statusMap, null); + } + + // Generates 100 calls, 20 each across the subchannels. Default status is OK. + private void generateLoad(Map statusMap, + Map maxCallsMap) { + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel5, ConnectivityStateInfo.forNonError(READY)); + + verify(mockHelper, times(7)).updateBalancingState(stateCaptor.capture(), + pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getAllValues().get(6); + + HashMap callCountMap = new HashMap<>(); + for (int i = 0; i < 100; i++) { + PickResult pickResult = picker + .pickSubchannel(mock(PickSubchannelArgs.class)); + ClientStreamTracer clientStreamTracer = pickResult.getStreamTracerFactory() + .newClientStreamTracer(null, null); + + Subchannel subchannel = ((OutlierDetectionSubchannel) pickResult.getSubchannel()).delegate(); + + int maxCalls = + maxCallsMap != null && maxCallsMap.containsKey(subchannel) + ? maxCallsMap.get(subchannel) : Integer.MAX_VALUE; + int calls = callCountMap.containsKey(subchannel) ? callCountMap.get(subchannel) : 0; + if (calls < maxCalls) { + callCountMap.put(subchannel, ++calls); + clientStreamTracer.streamClosed( + statusMap.containsKey(subchannel) ? statusMap.get(subchannel) : Status.OK); + } + } + } + + // Forwards time past the moment when the timer will fire. + private void forwardTime(OutlierDetectionLoadBalancerConfig config) { + fakeClock.forwardTime(config.intervalNanos + 1, TimeUnit.NANOSECONDS); + } + + // Asserts that the given addresses are ejected and the rest are not. + void assertEjectedSubchannels(Set addresses) { + for (Entry entry : loadBalancer.trackerMap.entrySet()) { + assertWithMessage("not ejected: " + entry.getKey()) + .that(entry.getValue().subchannelsEjected()) + .isEqualTo(addresses.contains(entry.getKey())); + } + } +}