Skip to content

Commit

Permalink
xds: least_request LB to use acceptResolvedAddresses() (#9616)
Browse files Browse the repository at this point in the history
This is part of a migration to move all LBs away from using
handleResolvedAddresses().
  • Loading branch information
temawi committed Nov 2, 2022
1 parent a65ecef commit 39c2646
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 23 deletions.
10 changes: 9 additions & 1 deletion xds/src/main/java/io/grpc/xds/LeastRequestLoadBalancer.java
Expand Up @@ -88,7 +88,13 @@ final class LeastRequestLoadBalancer extends LoadBalancer {
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
+ ", attrs=" + resolvedAddresses.getAttributes()));
return false;
}
LeastRequestConfig config =
(LeastRequestConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
// Config may be null if least_request is used outside xDS
Expand Down Expand Up @@ -146,6 +152,8 @@ public void onSubchannelState(ConnectivityStateInfo state) {
for (Subchannel removedSubchannel : removedSubchannels) {
shutdownSubchannel(removedSubchannel);
}

return true;
}

@Override
Expand Down
55 changes: 33 additions & 22 deletions xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java
Expand Up @@ -155,8 +155,9 @@ public void tearDown() throws Exception {
@Test
public void pickAfterResolved() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
assertThat(addressesAccepted).isTrue();
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));

verify(mockHelper, times(3)).createSubchannel(createArgsCaptor.capture());
Expand Down Expand Up @@ -206,9 +207,10 @@ public void pickAfterResolvedUpdatedHosts() throws Exception {

InOrder inOrder = inOrder(mockHelper);

loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(currentServers).setAttributes(affinity)
.build());
assertThat(addressesAccepted).isTrue();

inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());

Expand All @@ -228,8 +230,9 @@ public void pickAfterResolvedUpdatedHosts() throws Exception {
// This time with Attributes
List<EquivalentAddressGroup> latestServers = Lists.newArrayList(oldEag2, newEag);

loadBalancer.handleResolvedAddresses(
addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(latestServers).setAttributes(affinity).build());
assertThat(addressesAccepted).isTrue();

verify(newSubchannel, times(1)).requestConnection();
verify(oldSubchannel, times(1)).updateAddresses(Arrays.asList(oldEag2));
Expand All @@ -247,25 +250,16 @@ public void pickAfterResolvedUpdatedHosts() throws Exception {
picker = pickerCaptor.getValue();
assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel);

// test going from non-empty to empty
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(affinity)
.build());

inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs));

verifyNoMoreInteractions(mockHelper);
}

@Test
public void pickAfterStateChange() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
assertThat(addressesAccepted).isTrue();
Subchannel subchannel = loadBalancer.getSubchannels().iterator().next();
Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
STATE_INFO);
Expand Down Expand Up @@ -305,9 +299,10 @@ public void pickAfterConfigChange() {
final LeastRequestConfig oldConfig = new LeastRequestConfig(4);
final LeastRequestConfig newConfig = new LeastRequestConfig(6);
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity)
.setLoadBalancingPolicyConfig(oldConfig).build());
assertThat(addressesAccepted).isTrue();
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(mockHelper, times(2))
Expand All @@ -317,9 +312,10 @@ public void pickAfterConfigChange() {
pickerCaptor.getValue().pickSubchannel(mockArgs);
verify(mockRandom, times(oldConfig.choiceCount)).nextInt(1);

loadBalancer.handleResolvedAddresses(
addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity)
.setLoadBalancingPolicyConfig(newConfig).build());
assertThat(addressesAccepted).isTrue();
verify(mockHelper, times(3))
.updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture());

Expand All @@ -332,9 +328,10 @@ public void pickAfterConfigChange() {
@Test
public void ignoreShutdownSubchannelStateChange() {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
assertThat(addressesAccepted).isTrue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));

loadBalancer.shutdown();
Expand All @@ -351,9 +348,10 @@ public void ignoreShutdownSubchannelStateChange() {
@Test
public void stayTransientFailureUntilReady() {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
assertThat(addressesAccepted).isTrue();

inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));

Expand Down Expand Up @@ -389,9 +387,10 @@ public void stayTransientFailureUntilReady() {
@Test
public void refreshNameResolutionWhenSubchannelConnectionBroken() {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
assertThat(addressesAccepted).isTrue();

verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
Expand Down Expand Up @@ -419,10 +418,11 @@ public void refreshNameResolutionWhenSubchannelConnectionBroken() {
public void pickerLeastRequest() throws Exception {
int choiceCount = 2;
// This should add inFlight counters to all subchannels.
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.setLoadBalancingPolicyConfig(new LeastRequestConfig(choiceCount))
.build());
assertThat(addressesAccepted).isTrue();

assertEquals(3, loadBalancer.getSubchannels().size());

Expand Down Expand Up @@ -505,10 +505,11 @@ public void nameResolutionErrorWithNoChannels() throws Exception {
public void nameResolutionErrorWithActiveChannels() throws Exception {
int choiceCount = 8;
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setLoadBalancingPolicyConfig(new LeastRequestConfig(choiceCount))
.setAddresses(servers).setAttributes(affinity).build());
assertThat(addressesAccepted).isTrue();
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));

Expand Down Expand Up @@ -538,9 +539,10 @@ public void subchannelStateIsolation() throws Exception {
Subchannel sc2 = subchannelIterator.next();
Subchannel sc3 = subchannelIterator.next();

loadBalancer.handleResolvedAddresses(
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
assertThat(addressesAccepted).isTrue();
verify(sc1, times(1)).requestConnection();
verify(sc2, times(1)).requestConnection();
verify(sc3, times(1)).requestConnection();
Expand Down Expand Up @@ -613,6 +615,15 @@ public void internalPickerComparisons() {
assertFalse(ready5.isEquivalentTo(ready6));
}

@Test
public void emptyAddresses() {
assertThat(loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(affinity)
.build())).isFalse();
}

private static List<Subchannel> getList(SubchannelPicker picker) {
return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() :
Collections.<Subchannel>emptyList();
Expand Down

0 comments on commit 39c2646

Please sign in to comment.