diff --git a/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java b/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java index bc692ed6518d..a0676dea6109 100644 --- a/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java +++ b/core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ForwardingMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import io.grpc.Attributes; import io.grpc.ClientStreamTracer; @@ -71,7 +72,8 @@ public class OutlierDetectionLoadBalancer extends LoadBalancer { private ScheduledHandle detectionTimerHandle; private Long detectionTimerStartNanos; - private static final Attributes.Key EAG_INFO_ATTR_KEY = Attributes.Key.create("eagInfoKey"); + private static final Attributes.Key EAG_INFO_ATTR_KEY = Attributes.Key.create( + "eagInfoKey"); /** * Creates a new instance of {@link OutlierDetectionLoadBalancer}. @@ -257,14 +259,6 @@ public void updateAddresses(List addresses) { if (trackerMap.containsKey(newAddress)) { AddressTracker tracker = trackerMap.get(newAddress); tracker.addSubchannel(this); - - // Make sure that the subchannel is in the same ejection state as the new tracker it is - // associated with. - if (tracker.subchannelsEjected() && !ejected) { - eject(); - } else if (!tracker.subchannelsEjected() && ejected) { - uneject(); - } } } else if (getAllAddresses().size() == 1 && addresses.size() > 1) { // We go from a single address to having multiple, making this subchannel uneligible for @@ -277,24 +271,19 @@ public void updateAddresses(List addresses) { tracker.clearCallCounters(); } } else if (getAllAddresses().size() > 1 && addresses.size() == 1) { - // If the map has an entry for the new address, we associate this subchannel with it. + // We go from, previously uneligble, multiple address mode to a single address. If the map + // has an entry for the new address, we associate this subchannel with it. EquivalentAddressGroup eag = Iterables.getOnlyElement(addresses); if (trackerMap.containsKey(eag)) { AddressTracker tracker = trackerMap.get(eag); tracker.addSubchannel(this); - - // If the new address is already in the ejected state, we should also eject this - // subchannel. - if (tracker.subchannelsEjected()) { - eject(); - } } } // We could also have multiple addresses and get an update for multiple new ones. This is // a no-op as we will just continue to ignore multiple address subchannels. - super.updateAddresses(addresses); + delegate.updateAddresses(addresses); } /** @@ -318,6 +307,10 @@ void uneject() { } } + boolean isEjected() { + return ejected; + } + @Override protected Subchannel delegate() { return delegate; @@ -365,8 +358,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (subchannel != null) { return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory( - (AddressTracker) subchannel.getAttributes() - .get(EAG_INFO_ATTR_KEY))); + subchannel.getAttributes().get(EAG_INFO_ATTR_KEY))); } return pickResult; @@ -427,6 +419,13 @@ static class AddressTracker { } boolean addSubchannel(OutlierDetectionSubchannel subchannel) { + // Make sure that the subchannel is in the same ejection state as the new tracker it is + // associated with. + if (subchannelsEjected() && !subchannel.isEjected()) { + subchannel.eject(); + } else if (!subchannelsEjected() && subchannel.isEjected()) { + subchannel.uneject(); + } return subchannels.add(subchannel); } @@ -438,6 +437,11 @@ boolean containsSubchannel(OutlierDetectionSubchannel subchannel) { return subchannels.contains(subchannel); } + @VisibleForTesting + Set getSubchannels() { + return ImmutableSet.copyOf(subchannels); + } + void incrementCallCount(boolean success) { // If neither algorithm is configured, no point in incrementing counters. if (config.successRateEjection == null && config.failurePercentageEjection == null) { @@ -451,19 +455,21 @@ void incrementCallCount(boolean success) { } } - /** - * The total number of calls in the inactive call counter. - */ - long volume() { + @VisibleForTesting + long activeVolume() { + return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get(); + } + + long inactiveVolume() { return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get(); } double successRate() { - return ((double) inactiveCallCounter.successCount.get()) / volume(); + return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume(); } double failureRate() { - return ((double)inactiveCallCounter.failureCount.get()) / volume(); + return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume(); } void clearCallCounters() { @@ -488,9 +494,6 @@ void swapCounters() { inactiveCallCounter = tempCounter; } - /** - * Ejects the address from use. - */ void ejectSubchannels(long ejectionTimeNanos) { this.ejectionTimeNanos = ejectionTimeNanos; ejectionTimeMultiplier++; @@ -503,7 +506,7 @@ void ejectSubchannels(long ejectionTimeNanos) { * Uneject a currently ejected address. */ void unejectSubchannels() { - checkState(ejectionTimeNanos == null, "not currently ejected"); + checkState(ejectionTimeNanos != null, "not currently ejected"); ejectionTimeNanos = null; for (OutlierDetectionSubchannel subchannel : subchannels) { subchannel.uneject(); @@ -606,7 +609,7 @@ void maybeUnejectOutliers(Long detectionTimerStartNanos) { List trackersWithVolume(OutlierDetectionLoadBalancerConfig config) { List trackersWithVolume = new ArrayList<>(); for (AddressTracker tracker : trackerMap.values()) { - if (tracker.volume() >= config.successRateEjection.requestVolume) { + if (tracker.inactiveVolume() >= config.successRateEjection.requestVolume) { trackersWithVolume.add(tracker); } } @@ -751,7 +754,7 @@ public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeMillis) return; } - if (tracker.volume() < config.failurePercentageEjection.requestVolume) { + if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) { continue; } @@ -796,6 +799,7 @@ private OutlierDetectionLoadBalancerConfig(Long intervalSecs, this.childPolicy = childPolicy; } + /** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */ public static class Builder { Long intervalSecs = 10L; Long baseEjectionTimeSecs = 30L; @@ -805,48 +809,56 @@ public static class Builder { FailurePercentageEjection failurePercentageEjection; PolicySelection childPolicy; + /** The interval between outlier detection sweeps. */ public Builder setIntervalSecs(Long intervalSecs) { checkArgument(intervalSecs != null); this.intervalSecs = intervalSecs; return this; } + /** The base time an address is ejected for. */ public Builder setBaseEjectionTimeSecs(Long baseEjectionTimeSecs) { checkArgument(baseEjectionTimeSecs != null); this.baseEjectionTimeSecs = baseEjectionTimeSecs; return this; } + /** The longest time an address can be ejected. */ public Builder setMaxEjectionTimeSecs(Long maxEjectionTimeSecs) { checkArgument(maxEjectionTimeSecs != null); this.maxEjectionTimeSecs = maxEjectionTimeSecs; return this; } + /** The algorithm agnostic maximum percentage of addresses that can be ejected. */ public Builder setMaxEjectionPercent(Integer maxEjectionPercent) { checkArgument(maxEjectionPercent != null); this.maxEjectionPercent = maxEjectionPercent; return this; } + /** Set to enable success rate eejction. */ public Builder setSuccessRateEjection( SuccessRateEjection successRateEjection) { this.successRateEjection = successRateEjection; return this; } + /** Set to enable failure percentage ejection. */ public Builder setFailurePercentageEjection( FailurePercentageEjection failurePercentageEjection) { this.failurePercentageEjection = failurePercentageEjection; return this; } + /** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */ public Builder setChildPolicy(PolicySelection childPolicy) { checkState(childPolicy != null); this.childPolicy = childPolicy; return this; } + /** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */ public OutlierDetectionLoadBalancerConfig build() { checkState(childPolicy != null); return new OutlierDetectionLoadBalancerConfig(intervalSecs, baseEjectionTimeSecs, @@ -855,6 +867,7 @@ public OutlierDetectionLoadBalancerConfig build() { } } + /** The configuration for success rate ejection. */ public static class SuccessRateEjection { final Integer stdevFactor; @@ -870,6 +883,7 @@ public static class SuccessRateEjection { this.requestVolume = requestVolume; } + /** Builds new instances of {@link SuccessRateEjection}. */ public static final class Builder { Integer stdevFactor = 1900; @@ -877,30 +891,37 @@ public static final class Builder { Integer minimumHosts = 5; Integer requestVolume = 100; + /** The product of this and the standard deviation of success rates determine the ejection + * threshold. + */ public Builder setStdevFactor(Integer stdevFactor) { checkArgument(stdevFactor != null); this.stdevFactor = stdevFactor; return this; } + /** Only eject this percentage of outliers. */ public Builder setEnforcementPercentage(Integer enforcementPercentage) { checkArgument(enforcementPercentage != null); this.enforcementPercentage = enforcementPercentage; return this; } + /** The minimum amount of hosts needed for success rate ejection. */ public Builder setMinimumHosts(Integer minimumHosts) { checkArgument(minimumHosts != null); this.minimumHosts = minimumHosts; return this; } + /** The minimum address request volume to be considered for success rate ejection. */ public Builder setRequestVolume(Integer requestVolume) { checkArgument(requestVolume != null); this.requestVolume = requestVolume; return this; } + /** Builds a new instance of {@link SuccessRateEjection}. */ public SuccessRateEjection build() { return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts, requestVolume); @@ -908,6 +929,7 @@ public SuccessRateEjection build() { } } + /** The configuration for failure percentage ejection. */ public static class FailurePercentageEjection { final Integer threshold; final Integer enforcementPercentage; @@ -922,36 +944,45 @@ public static class FailurePercentageEjection { this.requestVolume = requestVolume; } + /** For building new {@link FailurePercentageEjection} instances. */ public static class Builder { Integer threshold = 85; Integer enforcementPercentage = 100; Integer minimumHosts = 5; Integer requestVolume = 50; + /** The failure percentage that will result in an address being considered an outlier. */ public Builder setThreshold(Integer threshold) { checkArgument(threshold != null); this.threshold = threshold; return this; } + /** Only eject this percentage of outliers. */ public Builder setEnforcementPercentage(Integer enforcementPercentage) { checkArgument(enforcementPercentage != null); this.enforcementPercentage = enforcementPercentage; return this; } + /** The minimum amount of host for failure percentage ejection to be enabled. */ public Builder setMinimumHosts(Integer minimumHosts) { checkArgument(minimumHosts != null); this.minimumHosts = minimumHosts; return this; } + /** + * The request volume required for an address to be considered for failure percentage + * ejection. + */ public Builder setRequestVolume(Integer requestVolume) { checkArgument(requestVolume != null); this.requestVolume = requestVolume; return this; } + /** Builds a new instance of {@link FailurePercentageEjection}. */ public FailurePercentageEjection build() { return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts, requestVolume); diff --git a/core/src/test/java/io/grpc/internal/FakeClock.java b/core/src/test/java/io/grpc/internal/FakeClock.java index b6a88de00ba0..9cc9178f1ff7 100644 --- a/core/src/test/java/io/grpc/internal/FakeClock.java +++ b/core/src/test/java/io/grpc/internal/FakeClock.java @@ -248,7 +248,6 @@ class ScheduleWithFixedDelayTask extends ScheduledTask { @Override void run() { - long startTimeNanos = currentTimeNanos; command.run(); if (!isCancelled()) { schedule(this, delayNanos, TimeUnit.NANOSECONDS); diff --git a/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java index add4ca27522a..f08860d6c530 100644 --- a/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -68,6 +68,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -89,8 +90,6 @@ public class OutlierDetectionLoadBalancerTest { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); - @Mock - private Helper helper; @Mock private LoadBalancer mockChildLb; @Mock @@ -106,8 +105,6 @@ public class OutlierDetectionLoadBalancerTest { private ArgumentCaptor pickerCaptor; @Captor private ArgumentCaptor stateCaptor; - @Captor - private ArgumentCaptor createArgsCaptor; private final LoadBalancerProvider mockChildLbProvider = new StandardLoadBalancerProvider( "foo_policy") { @@ -132,12 +129,12 @@ public void uncaughtException(Thread t, Throwable e) { throw new AssertionError(e); } }); - private LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); private OutlierDetectionLoadBalancer loadBalancer; private final List servers = Lists.newArrayList(); private final Map, Subchannel> subchannels = Maps.newLinkedHashMap(); - private final Map subchannelStateListeners = Maps.newLinkedHashMap(); + private final Map subchannelStateListeners + = Maps.newLinkedHashMap(); private Subchannel subchannel1; private Subchannel subchannel2; @@ -185,8 +182,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } }); - lbRegistry.register(roundRobinLbProvider); - loadBalancer = new OutlierDetectionLoadBalancer(mockHelper, fakeClock.getTimeProvider()); } @@ -362,13 +357,13 @@ public void successRateNoOutliers() { loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); - generateLoad(config, ImmutableMap.of()); + generateLoad(ImmutableMap.of()); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // No outliers, no ejections. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -384,13 +379,49 @@ public void successRateOneOutlier() { .setRequestVolume(10).build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // The one subchannel that was returning errors should be ejected. - assertEjectedChannels(ImmutableSet.of(servers.get(0))); + assertEjectedSubchannels(ImmutableSet.of(servers.get(0))); + } + + /** + * The success rate algorithm ejects the outlier but after some time it should get unejected + * if it stops being an outlier.. + */ + @Test + public void successRateOneOutlier_unejected() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setSuccessRateEjection( + new SuccessRateEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(servers.get(0))); + + // Now we produce more load, but the subchannel start working and is no longer an outlier. + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.maxEjectionTimeSecs + 1, TimeUnit.SECONDS); + + // No subchannels should remain ejected. + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -406,13 +437,15 @@ public void successRateOneOutlier_notEnoughVolume() { .setRequestVolume(100).build()) // We won't produce this much volume... .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // The address should not have been ejected.. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -428,13 +461,15 @@ public void successRateOneOutlier_notEnoughAddressesWithVolume() { .setRequestVolume(10).build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // No subchannels should have been ejected. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -452,13 +487,15 @@ public void successRateOneOutlier_enforcementPercentage() { .build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // There is one outlier, but because enforcementPercentage is 0, nothing should be ejected. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -475,7 +512,9 @@ public void successRateTwoOutliers() { .setStdevFactor(1).build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of( + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of( subchannel1, Status.DEADLINE_EXCEEDED, subchannel2, Status.DEADLINE_EXCEEDED)); @@ -483,13 +522,7 @@ public void successRateTwoOutliers() { fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // The one subchannel that was returning errors should be ejected. - assertThat(loadBalancer.trackerMap.get(servers.get(0)).subchannelsEjected()).isTrue(); - assertThat(loadBalancer.trackerMap.get(servers.get(1)).subchannelsEjected()).isTrue(); - assertThat(loadBalancer.trackerMap.get(servers.get(2)).subchannelsEjected()).isFalse(); - assertThat(loadBalancer.trackerMap.get(servers.get(3)).subchannelsEjected()).isFalse(); - assertThat(loadBalancer.trackerMap.get(servers.get(4)).subchannelsEjected()).isFalse(); - - //assertEjectedChannels(ImmutableSet.of(servers.get(0), servers.get(1))); + assertEjectedSubchannels(ImmutableSet.of(servers.get(0), servers.get(1))); } /** @@ -506,7 +539,9 @@ public void successRateTwoOutliers_maxEjectionPercentage() { .setStdevFactor(1).build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of( + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of( subchannel1, Status.DEADLINE_EXCEEDED, subchannel2, Status.DEADLINE_EXCEEDED)); @@ -541,13 +576,13 @@ public void failurePercentageNoOutliers() { loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); // By default all calls will return OK. - generateLoad(config, ImmutableMap.of()); + generateLoad(ImmutableMap.of()); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // No outliers, no ejections. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -563,13 +598,15 @@ public void failurePercentageOneOutlier() { .setRequestVolume(10).build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // The one subchannel that was returning errors should be ejected. - assertEjectedChannels(ImmutableSet.of(servers.get(0))); + assertEjectedSubchannels(ImmutableSet.of(servers.get(0))); } /** @@ -585,13 +622,15 @@ public void failurePercentageOneOutlier_notEnoughVolume() { .setRequestVolume(100).build()) // We won't produce this much volume... .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // We should see no ejections. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); } /** @@ -609,13 +648,165 @@ public void failurePercentageOneOutlier_enforcementPercentage() { .build()) .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); - generateLoad(config, ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); // There is one outlier, but because enforcementPercentage is 0, nothing should be ejected. - assertEjectedChannels(ImmutableSet.of()); + assertEjectedSubchannels(ImmutableSet.of()); + } + + /** + * When the address a subchannel is associated with changes it should get tracked under the new + * address and its ejection state should match what the address has. + */ + @Test + public void subchannelUpdateAddress_singleReplaced() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); + + EquivalentAddressGroup oldAddress = servers.get(0); + AddressTracker oldAddressTracker = loadBalancer.trackerMap.get(oldAddress); + EquivalentAddressGroup newAddress = servers.get(1); + AddressTracker newAddressTracker = loadBalancer.trackerMap.get(newAddress); + + // The one subchannel that was returning errors should be ejected. + assertEjectedSubchannels(ImmutableSet.of(oldAddress)); + + // The ejected subchannel gets updated with another address in the map that is not ejected + OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() + .iterator().next(); + subchannel.updateAddresses(ImmutableList.of(newAddress)); + + // The replaced address should no longer have the subchannel associated with it. + assertThat(oldAddressTracker.getSubchannels()).doesNotContain(subchannel); + + // The new address should instead have the subchannel. + assertThat(newAddressTracker.getSubchannels()).contains(subchannel); + + // Since the new address is not ejected, the ejected subchannel moving over to it should also + // become unejected. + assertThat(subchannel.isEjected()).isFalse(); + } + + /** + * If a single address gets replaced by multiple, the subchannel becomes uneligible for outlier + * detection. + */ + @Test + public void subchannelUpdateAddress_singleReplacedWithMultiple() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of()); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); + + EquivalentAddressGroup oldAddress = servers.get(0); + AddressTracker oldAddressTracker = loadBalancer.trackerMap.get(oldAddress); + EquivalentAddressGroup newAddress1 = servers.get(1); + EquivalentAddressGroup newAddress2 = servers.get(2); + + OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() + .iterator().next(); + + // The subchannel gets updated with two new addresses + ImmutableList addressUpdate + = ImmutableList.of(newAddress1, newAddress2); + subchannel.updateAddresses(addressUpdate); + when(subchannel1.getAllAddresses()).thenReturn(addressUpdate); + + // The replaced address should no longer be tracked. + assertThat(oldAddressTracker.getSubchannels()).doesNotContain(subchannel); + + // The old tracker should also have its call counters cleared. + assertThat(oldAddressTracker.activeVolume()).isEqualTo(0); + assertThat(oldAddressTracker.inactiveVolume()).isEqualTo(0); + } + + /** + * A subchannel with multiple addresses will again become eligible for outlier detection if it + * receives an update with a single address. + * + *

TODO: Figure out how to test this scenario, round_robin does not support multiple addresses + * and fails the transition from multiple addresses to single. + */ + @Ignore + public void subchannelUpdateAddress_multipleReplacedWithSingle() { + OutlierDetectionLoadBalancerConfig config = new OutlierDetectionLoadBalancerConfig.Builder() + .setMaxEjectionPercent(50) + .setFailurePercentageEjection( + new FailurePercentageEjection.Builder() + .setMinimumHosts(3) + .setRequestVolume(10).build()) + .setChildPolicy(new PolicySelection(roundRobinLbProvider, null)).build(); + + loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); + + generateLoad(ImmutableMap.of(subchannel1, Status.DEADLINE_EXCEEDED)); + + // Move forward in time to a point where the detection timer has fired. + fakeClock.forwardTime(config.intervalSecs + 1, TimeUnit.SECONDS); + + EquivalentAddressGroup oldAddress = servers.get(0); + AddressTracker oldAddressTracker = loadBalancer.trackerMap.get(oldAddress); + EquivalentAddressGroup newAddress1 = servers.get(1); + AddressTracker newAddressTracker1 = loadBalancer.trackerMap.get(newAddress1); + EquivalentAddressGroup newAddress2 = servers.get(2); + + // The old subchannel was returning errors and should be ejected. + assertEjectedSubchannels(ImmutableSet.of(oldAddress)); + + OutlierDetectionSubchannel subchannel = oldAddressTracker.getSubchannels() + .iterator().next(); + + // The subchannel gets updated with two new addresses + ImmutableList addressUpdate + = ImmutableList.of(newAddress1, newAddress2); + subchannel.updateAddresses(addressUpdate); + when(subchannel1.getAllAddresses()).thenReturn(addressUpdate); + + // The replaced address should no longer be tracked. + assertThat(oldAddressTracker.getSubchannels()).doesNotContain(subchannel); + + // The old tracker should also have its call counters cleared. + assertThat(oldAddressTracker.activeVolume()).isEqualTo(0); + assertThat(oldAddressTracker.inactiveVolume()).isEqualTo(0); + + // Another update takes the subchannel back to a single address. + addressUpdate = ImmutableList.of(newAddress1); + subchannel.updateAddresses(addressUpdate); + when(subchannel1.getAllAddresses()).thenReturn(addressUpdate); + + // The subchannel is now associated with the single new address. + assertThat(newAddressTracker1.getSubchannels()).contains(subchannel); + + // The previously ejected subchannel should become unejected as it is now associated with an + // unejected address. + assertThat(subchannel.isEjected()).isFalse(); } @Test @@ -658,16 +849,8 @@ private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo subchannelStateListeners.get(subchannel).onSubchannelState(newState); } - private static List getList(SubchannelPicker picker) { - return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() - : Collections.emptyList(); - } - - // Generates 30 calls, 10 each across the subchannels. - private void generateLoad(OutlierDetectionLoadBalancerConfig config, - Map statusMap) { - loadBalancer.handleResolvedAddresses(buildResolvedAddress(config, servers)); - + // Generates 100 calls, 20 each across the subchannels. Default status is OK. + private void generateLoad(Map statusMap) { deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); @@ -684,7 +867,6 @@ private void generateLoad(OutlierDetectionLoadBalancerConfig config, ClientStreamTracer clientStreamTracer = pickResult.getStreamTracerFactory() .newClientStreamTracer(null, null); - // Fail all the calls for the first subchannel. Subchannel subchannel = ((OutlierDetectionSubchannel) pickResult.getSubchannel()).delegate(); clientStreamTracer.streamClosed( statusMap.containsKey(subchannel) ? statusMap.get(subchannel) : Status.OK); @@ -692,7 +874,7 @@ private void generateLoad(OutlierDetectionLoadBalancerConfig config, } // Asserts that the given addresses are ejected and the rest are not. - void assertEjectedChannels(Set addresses) { + void assertEjectedSubchannels(Set addresses) { for (Entry entry : loadBalancer.trackerMap.entrySet()) { assertThat(entry.getValue().subchannelsEjected()).isEqualTo( addresses.contains(entry.getKey()));