Skip to content

Commit

Permalink
core: pick_first LB to use acceptResolvedAddresses() (grpc#9548)
Browse files Browse the repository at this point in the history
  • Loading branch information
temawi authored and larry-safran committed Oct 6, 2022
1 parent 1b112eb commit 0c2cb1c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 12 deletions.
11 changes: 10 additions & 1 deletion core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ final class PickFirstLoadBalancer extends LoadBalancer {
}

@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
if (servers.isEmpty()) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
+ ", attrs=" + resolvedAddresses.getAttributes()));
return false;
}

if (subchannel == null) {
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
Expand All @@ -67,6 +74,8 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
} else {
subchannel.updateAddresses(servers);
}

return true;
}

@Override
Expand Down
40 changes: 29 additions & 11 deletions core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import java.net.SocketAddress;
import java.util.List;
Expand Down Expand Up @@ -89,6 +90,8 @@ public void uncaughtException(Thread t, Throwable e) {
@Captor
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
@Captor
private ArgumentCaptor<ConnectivityState> connectivityStateCaptor;
@Captor
private ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor;
@Captor
private ArgumentCaptor<SubchannelStateListener> stateListenerCaptor;
Expand Down Expand Up @@ -121,7 +124,7 @@ public void tearDown() throws Exception {

@Test
public void pickAfterResolved() throws Exception {
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());

verify(mockHelper).createSubchannel(createArgsCaptor.capture());
Expand All @@ -139,7 +142,7 @@ public void pickAfterResolved() throws Exception {

@Test
public void requestConnectionPicker() throws Exception {
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());

InOrder inOrder = inOrder(mockHelper, mockSubchannel);
Expand All @@ -164,7 +167,7 @@ public void requestConnectionPicker() throws Exception {

@Test
public void refreshNameResolutionAfterSubchannelConnectionBroken() {
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class));

Expand Down Expand Up @@ -196,11 +199,11 @@ public void refreshNameResolutionAfterSubchannelConnectionBroken() {

@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).start(any(SubchannelStateListener.class));
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).updateAddresses(eq(servers));
verifyNoMoreInteractions(mockSubchannel);
Expand All @@ -223,7 +226,7 @@ public void pickAfterResolvedAndChanged() throws Exception {

InOrder inOrder = inOrder(mockHelper, mockSubchannel);

loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
verify(mockSubchannel).start(any(SubchannelStateListener.class));
Expand All @@ -233,7 +236,7 @@ public void pickAfterResolvedAndChanged() throws Exception {
verify(mockSubchannel).requestConnection();
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());

loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
inOrder.verify(mockSubchannel).updateAddresses(eq(newServers));

Expand All @@ -245,7 +248,7 @@ public void pickAfterResolvedAndChanged() throws Exception {
public void pickAfterStateChangeAfterResolution() throws Exception {
InOrder inOrder = inOrder(mockHelper);

loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
CreateSubchannelArgs args = createArgsCaptor.getValue();
Expand Down Expand Up @@ -288,6 +291,21 @@ public void nameResolutionError() throws Exception {
verifyNoMoreInteractions(mockHelper);
}

@Test
public void nameResolutionError_emptyAddressList() throws Exception {
servers.clear();
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockHelper).updateBalancingState(connectivityStateCaptor.capture(),
pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertThat(pickResult.getSubchannel()).isNull();
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(pickResult.getStatus().getDescription()).contains("returned no usable address");
verify(mockSubchannel, never()).requestConnection();
verifyNoMoreInteractions(mockHelper);
}

@Test
public void nameResolutionSuccessAfterError() throws Exception {
InOrder inOrder = inOrder(mockHelper);
Expand All @@ -297,7 +315,7 @@ public void nameResolutionSuccessAfterError() throws Exception {
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verify(mockSubchannel, never()).requestConnection();

loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
CreateSubchannelArgs args = createArgsCaptor.getValue();
Expand All @@ -318,7 +336,7 @@ public void nameResolutionSuccessAfterError() throws Exception {
@Test
public void nameResolutionErrorWithStateChanges() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
verify(mockSubchannel).start(stateListenerCaptor.capture());
Expand Down Expand Up @@ -358,7 +376,7 @@ public void requestConnection() {
loadBalancer.requestConnection();

verify(mockSubchannel, never()).requestConnection();
loadBalancer.handleResolvedAddresses(
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).requestConnection();

Expand Down

0 comments on commit 0c2cb1c

Please sign in to comment.