Skip to content

Commit

Permalink
Added tests for subchannel address updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
temawi committed Aug 15, 2022
1 parent ee4e748 commit d275d72
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 79 deletions.
93 changes: 62 additions & 31 deletions core/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java
Expand Up @@ -24,6 +24,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ForwardingMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ClientStreamTracer;
Expand Down Expand Up @@ -71,7 +72,8 @@ public class OutlierDetectionLoadBalancer extends LoadBalancer {
private ScheduledHandle detectionTimerHandle;
private Long detectionTimerStartNanos;

private static final Attributes.Key EAG_INFO_ATTR_KEY = Attributes.Key.create("eagInfoKey");
private static final Attributes.Key<AddressTracker> EAG_INFO_ATTR_KEY = Attributes.Key.create(
"eagInfoKey");

/**
* Creates a new instance of {@link OutlierDetectionLoadBalancer}.
Expand Down Expand Up @@ -257,14 +259,6 @@ public void updateAddresses(List<EquivalentAddressGroup> addresses) {
if (trackerMap.containsKey(newAddress)) {
AddressTracker tracker = trackerMap.get(newAddress);
tracker.addSubchannel(this);

// Make sure that the subchannel is in the same ejection state as the new tracker it is
// associated with.
if (tracker.subchannelsEjected() && !ejected) {
eject();
} else if (!tracker.subchannelsEjected() && ejected) {
uneject();
}
}
} else if (getAllAddresses().size() == 1 && addresses.size() > 1) {
// We go from a single address to having multiple, making this subchannel uneligible for
Expand All @@ -277,24 +271,19 @@ public void updateAddresses(List<EquivalentAddressGroup> addresses) {
tracker.clearCallCounters();
}
} else if (getAllAddresses().size() > 1 && addresses.size() == 1) {
// If the map has an entry for the new address, we associate this subchannel with it.
// We go from, previously uneligble, multiple address mode to a single address. If the map
// has an entry for the new address, we associate this subchannel with it.
EquivalentAddressGroup eag = Iterables.getOnlyElement(addresses);
if (trackerMap.containsKey(eag)) {
AddressTracker tracker = trackerMap.get(eag);
tracker.addSubchannel(this);

// If the new address is already in the ejected state, we should also eject this
// subchannel.
if (tracker.subchannelsEjected()) {
eject();
}
}
}

// We could also have multiple addresses and get an update for multiple new ones. This is
// a no-op as we will just continue to ignore multiple address subchannels.

super.updateAddresses(addresses);
delegate.updateAddresses(addresses);
}

/**
Expand All @@ -318,6 +307,10 @@ void uneject() {
}
}

boolean isEjected() {
return ejected;
}

@Override
protected Subchannel delegate() {
return delegate;
Expand Down Expand Up @@ -365,8 +358,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (subchannel != null) {
return PickResult.withSubchannel(subchannel,
new ResultCountingClientStreamTracerFactory(
(AddressTracker) subchannel.getAttributes()
.get(EAG_INFO_ATTR_KEY)));
subchannel.getAttributes().get(EAG_INFO_ATTR_KEY)));
}

return pickResult;
Expand Down Expand Up @@ -427,6 +419,13 @@ static class AddressTracker {
}

boolean addSubchannel(OutlierDetectionSubchannel subchannel) {
// Make sure that the subchannel is in the same ejection state as the new tracker it is
// associated with.
if (subchannelsEjected() && !subchannel.isEjected()) {
subchannel.eject();
} else if (!subchannelsEjected() && subchannel.isEjected()) {
subchannel.uneject();
}
return subchannels.add(subchannel);
}

Expand All @@ -438,6 +437,11 @@ boolean containsSubchannel(OutlierDetectionSubchannel subchannel) {
return subchannels.contains(subchannel);
}

@VisibleForTesting
Set<OutlierDetectionSubchannel> getSubchannels() {
return ImmutableSet.copyOf(subchannels);
}

void incrementCallCount(boolean success) {
// If neither algorithm is configured, no point in incrementing counters.
if (config.successRateEjection == null && config.failurePercentageEjection == null) {
Expand All @@ -451,19 +455,21 @@ void incrementCallCount(boolean success) {
}
}

/**
* The total number of calls in the inactive call counter.
*/
long volume() {
@VisibleForTesting
long activeVolume() {
return activeCallCounter.successCount.get() + activeCallCounter.failureCount.get();
}

long inactiveVolume() {
return inactiveCallCounter.successCount.get() + inactiveCallCounter.failureCount.get();
}

double successRate() {
return ((double) inactiveCallCounter.successCount.get()) / volume();
return ((double) inactiveCallCounter.successCount.get()) / inactiveVolume();
}

double failureRate() {
return ((double)inactiveCallCounter.failureCount.get()) / volume();
return ((double)inactiveCallCounter.failureCount.get()) / inactiveVolume();
}

void clearCallCounters() {
Expand All @@ -488,9 +494,6 @@ void swapCounters() {
inactiveCallCounter = tempCounter;
}

/**
* Ejects the address from use.
*/
void ejectSubchannels(long ejectionTimeNanos) {
this.ejectionTimeNanos = ejectionTimeNanos;
ejectionTimeMultiplier++;
Expand All @@ -503,7 +506,7 @@ void ejectSubchannels(long ejectionTimeNanos) {
* Uneject a currently ejected address.
*/
void unejectSubchannels() {
checkState(ejectionTimeNanos == null, "not currently ejected");
checkState(ejectionTimeNanos != null, "not currently ejected");
ejectionTimeNanos = null;
for (OutlierDetectionSubchannel subchannel : subchannels) {
subchannel.uneject();
Expand Down Expand Up @@ -606,7 +609,7 @@ void maybeUnejectOutliers(Long detectionTimerStartNanos) {
List<AddressTracker> trackersWithVolume(OutlierDetectionLoadBalancerConfig config) {
List<AddressTracker> trackersWithVolume = new ArrayList<>();
for (AddressTracker tracker : trackerMap.values()) {
if (tracker.volume() >= config.successRateEjection.requestVolume) {
if (tracker.inactiveVolume() >= config.successRateEjection.requestVolume) {
trackersWithVolume.add(tracker);
}
}
Expand Down Expand Up @@ -751,7 +754,7 @@ public void ejectOutliers(AddressTrackerMap trackerMap, long ejectionTimeMillis)
return;
}

if (tracker.volume() < config.failurePercentageEjection.requestVolume) {
if (tracker.inactiveVolume() < config.failurePercentageEjection.requestVolume) {
continue;
}

Expand Down Expand Up @@ -796,6 +799,7 @@ private OutlierDetectionLoadBalancerConfig(Long intervalSecs,
this.childPolicy = childPolicy;
}

/** Builds a new {@link OutlierDetectionLoadBalancerConfig}. */
public static class Builder {
Long intervalSecs = 10L;
Long baseEjectionTimeSecs = 30L;
Expand All @@ -805,48 +809,56 @@ public static class Builder {
FailurePercentageEjection failurePercentageEjection;
PolicySelection childPolicy;

/** The interval between outlier detection sweeps. */
public Builder setIntervalSecs(Long intervalSecs) {
checkArgument(intervalSecs != null);
this.intervalSecs = intervalSecs;
return this;
}

/** The base time an address is ejected for. */
public Builder setBaseEjectionTimeSecs(Long baseEjectionTimeSecs) {
checkArgument(baseEjectionTimeSecs != null);
this.baseEjectionTimeSecs = baseEjectionTimeSecs;
return this;
}

/** The longest time an address can be ejected. */
public Builder setMaxEjectionTimeSecs(Long maxEjectionTimeSecs) {
checkArgument(maxEjectionTimeSecs != null);
this.maxEjectionTimeSecs = maxEjectionTimeSecs;
return this;
}

/** The algorithm agnostic maximum percentage of addresses that can be ejected. */
public Builder setMaxEjectionPercent(Integer maxEjectionPercent) {
checkArgument(maxEjectionPercent != null);
this.maxEjectionPercent = maxEjectionPercent;
return this;
}

/** Set to enable success rate eejction. */
public Builder setSuccessRateEjection(
SuccessRateEjection successRateEjection) {
this.successRateEjection = successRateEjection;
return this;
}

/** Set to enable failure percentage ejection. */
public Builder setFailurePercentageEjection(
FailurePercentageEjection failurePercentageEjection) {
this.failurePercentageEjection = failurePercentageEjection;
return this;
}

/** Sets the child policy the {@link OutlierDetectionLoadBalancer} delegates to. */
public Builder setChildPolicy(PolicySelection childPolicy) {
checkState(childPolicy != null);
this.childPolicy = childPolicy;
return this;
}

/** Builds a new instance of {@link OutlierDetectionLoadBalancerConfig}. */
public OutlierDetectionLoadBalancerConfig build() {
checkState(childPolicy != null);
return new OutlierDetectionLoadBalancerConfig(intervalSecs, baseEjectionTimeSecs,
Expand All @@ -855,6 +867,7 @@ public OutlierDetectionLoadBalancerConfig build() {
}
}

/** The configuration for success rate ejection. */
public static class SuccessRateEjection {

final Integer stdevFactor;
Expand All @@ -870,44 +883,53 @@ public static class SuccessRateEjection {
this.requestVolume = requestVolume;
}

/** Builds new instances of {@link SuccessRateEjection}. */
public static final class Builder {

Integer stdevFactor = 1900;
Integer enforcementPercentage = 100;
Integer minimumHosts = 5;
Integer requestVolume = 100;

/** The product of this and the standard deviation of success rates determine the ejection
* threshold.
*/
public Builder setStdevFactor(Integer stdevFactor) {
checkArgument(stdevFactor != null);
this.stdevFactor = stdevFactor;
return this;
}

/** Only eject this percentage of outliers. */
public Builder setEnforcementPercentage(Integer enforcementPercentage) {
checkArgument(enforcementPercentage != null);
this.enforcementPercentage = enforcementPercentage;
return this;
}

/** The minimum amount of hosts needed for success rate ejection. */
public Builder setMinimumHosts(Integer minimumHosts) {
checkArgument(minimumHosts != null);
this.minimumHosts = minimumHosts;
return this;
}

/** The minimum address request volume to be considered for success rate ejection. */
public Builder setRequestVolume(Integer requestVolume) {
checkArgument(requestVolume != null);
this.requestVolume = requestVolume;
return this;
}

/** Builds a new instance of {@link SuccessRateEjection}. */
public SuccessRateEjection build() {
return new SuccessRateEjection(stdevFactor, enforcementPercentage, minimumHosts,
requestVolume);
}
}
}

/** The configuration for failure percentage ejection. */
public static class FailurePercentageEjection {
final Integer threshold;
final Integer enforcementPercentage;
Expand All @@ -922,36 +944,45 @@ public static class FailurePercentageEjection {
this.requestVolume = requestVolume;
}

/** For building new {@link FailurePercentageEjection} instances. */
public static class Builder {
Integer threshold = 85;
Integer enforcementPercentage = 100;
Integer minimumHosts = 5;
Integer requestVolume = 50;

/** The failure percentage that will result in an address being considered an outlier. */
public Builder setThreshold(Integer threshold) {
checkArgument(threshold != null);
this.threshold = threshold;
return this;
}

/** Only eject this percentage of outliers. */
public Builder setEnforcementPercentage(Integer enforcementPercentage) {
checkArgument(enforcementPercentage != null);
this.enforcementPercentage = enforcementPercentage;
return this;
}

/** The minimum amount of host for failure percentage ejection to be enabled. */
public Builder setMinimumHosts(Integer minimumHosts) {
checkArgument(minimumHosts != null);
this.minimumHosts = minimumHosts;
return this;
}

/**
* The request volume required for an address to be considered for failure percentage
* ejection.
*/
public Builder setRequestVolume(Integer requestVolume) {
checkArgument(requestVolume != null);
this.requestVolume = requestVolume;
return this;
}

/** Builds a new instance of {@link FailurePercentageEjection}. */
public FailurePercentageEjection build() {
return new FailurePercentageEjection(threshold, enforcementPercentage, minimumHosts,
requestVolume);
Expand Down
1 change: 0 additions & 1 deletion core/src/test/java/io/grpc/internal/FakeClock.java
Expand Up @@ -248,7 +248,6 @@ class ScheduleWithFixedDelayTask extends ScheduledTask {

@Override
void run() {
long startTimeNanos = currentTimeNanos;
command.run();
if (!isCancelled()) {
schedule(this, delayNanos, TimeUnit.NANOSECONDS);
Expand Down

0 comments on commit d275d72

Please sign in to comment.