Skip to content

Commit

Permalink
core: Fully delegate picks to DelayedClientTransport
Browse files Browse the repository at this point in the history
DelayedClientTransport already had to handle all the cases, so
ManagedChannelImpl picking was acting only as an optimization.
Optimizing DelayedClientTransport to avoid the lock when not queuing
makes ManagedChannelImpl picking entirely redundant, and allows us to
remove the duplicate race-handling logic.

This avoids double-picking when queuing, where ManagedChannelImpl does a
pick, decides to queue, and then DelayedClientTransport re-performs the
pick because it doesn't know which pick version was used. This was
noticed with RLS, which mutates state within the picker.
  • Loading branch information
ejona86 committed May 14, 2024
1 parent d9e09c2 commit 8844cf7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 88 deletions.
99 changes: 54 additions & 45 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -69,23 +69,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();

/**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
* terminated.
*/
@GuardedBy("lock")
private Status shutdownStatus;

/**
* The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
* to idle.
*/
@GuardedBy("lock")
@Nullable
private SubchannelPicker lastPicker;

@GuardedBy("lock")
private long lastPickerVersion;
/** Immutable state needed for picking. 'lock' must be held for writing. */
private volatile PickerState pickerState = new PickerState(null, null);

/**
* Creates a new delayed transport.
Expand Down Expand Up @@ -139,33 +124,30 @@ public final ClientStream newStream(
try {
PickSubchannelArgs args = new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
SubchannelPicker picker = null;
long pickerVersion = -1;
PickerState state = pickerState;
while (true) {
synchronized (lock) {
if (shutdownStatus != null) {
return new FailingClientStream(shutdownStatus, tracers);
}
if (lastPicker == null) {
return createPendingStream(args, tracers);
}
// Check for second time through the loop, and whether anything changed
if (picker != null && pickerVersion == lastPickerVersion) {
return createPendingStream(args, tracers);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
if (state.shutdownStatus != null) {
return new FailingClientStream(state.shutdownStatus, tracers);
}
PickResult pickResult = picker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
tracers);
if (state.lastPicker != null) {
PickResult pickResult = state.lastPicker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
tracers);
}
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
// race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers);
}
state = newerState;
}
}
} finally {
syncContext.drain();
Expand Down Expand Up @@ -210,10 +192,10 @@ public ListenableFuture<SocketStats> getStats() {
@Override
public final void shutdown(final Status status) {
synchronized (lock) {
if (shutdownStatus != null) {
if (pickerState.shutdownStatus != null) {
return;
}
shutdownStatus = status;
pickerState = pickerState.withShutdownStatus(status);
syncContext.executeLater(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -288,8 +270,7 @@ final int getPendingStreamsCount() {
final void reprocess(@Nullable SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
synchronized (lock) {
lastPicker = picker;
lastPickerVersion++;
pickerState = pickerState.withPicker(picker);
if (picker == null || !hasPendingStreams()) {
return;
}
Expand Down Expand Up @@ -338,7 +319,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) {
if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down Expand Up @@ -384,7 +365,7 @@ public void cancel(Status reason) {
boolean justRemovedAnElement = pendingStreams.remove(this);
if (!hasPendingStreams() && justRemovedAnElement) {
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) {
if (pickerState.shutdownStatus != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand All @@ -409,4 +390,32 @@ public void appendTimeoutInsight(InsightBuilder insight) {
super.appendTimeoutInsight(insight);
}
}

static final class PickerState {
/**
* The last picker that {@link #reprocess} has used. May be set to null when the channel has
* moved to idle.
*/
@Nullable
final SubchannelPicker lastPicker;
/**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
* terminated.
*/
@Nullable
final Status shutdownStatus;

private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
this.lastPicker = lastPicker;
this.shutdownStatus = shutdownStatus;
}

public PickerState withPicker(SubchannelPicker newPicker) {
return new PickerState(newPicker, this.shutdownStatus);
}

public PickerState withShutdownStatus(Status newShutdownStatus) {
return new PickerState(this.lastPicker, newShutdownStatus);
}
}
}
47 changes: 4 additions & 43 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Expand Up @@ -471,57 +471,20 @@ private void refreshNameResolution() {
private final class ChannelStreamProvider implements ClientStreamProvider {
volatile Throttle throttle;

private ClientTransport getTransport(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) {
// If channel is shut down, delayedTransport is also shut down which will fail the stream
// properly.
return delayedTransport;
}
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
exitIdleMode();
}
}

syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
// There is no need to reschedule the idle timer here.
//
// pickerCopy != null, which means idle timer has not expired when this method starts.
// Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
// which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
// SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
//
// In most cases the idle timer is scheduled to fire after the transport has created the
// stream, which would have reported in-use state to the channel that would have cancelled
// the idle timer.
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, args.getCallOptions().isWaitForReady());
if (transport != null) {
return transport;
}
return delayedTransport;
}

@Override
public ClientStream newStream(
final MethodDescriptor<?, ?> method,
final CallOptions callOptions,
final Metadata headers,
final Context context) {
// There is no need to reschedule the idle timer here. If the channel isn't shut down, either
// the delayed transport or a real transport will go in-use and cancel the idle timer.
if (!retryEnabled) {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false);
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions, tracers);
return delayedTransport.newStream(method, headers, callOptions, tracers);
} finally {
context.detach(origContext);
}
Expand Down Expand Up @@ -562,11 +525,9 @@ ClientStream newSubstream(
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
newOptions, newHeaders, previousAttempts, isTransparentRetry);
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach();
try {
return transport.newStream(method, newHeaders, newOptions, tracers);
return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
} finally {
context.detach(origContext);
}
Expand Down

0 comments on commit 8844cf7

Please sign in to comment.