Skip to content

Commit

Permalink
core: DelayedStream should start() real stream immediately
Browse files Browse the repository at this point in the history
DelayedClientTransport needs to avoid becoming terminated while it owns
RPCs. Previously DelayedClientTransport could terminate when some of its
RPCs had their realStream but realStream.start() hadn't yet been called.

To avoid that, we now make sure to call realStream.start()
synchronously with setting realStream. Since start() and the method
calls before start execute quickly, we can run it in-line. But it does
mean we now need to split the Stream methods into "before start" and
"after start" categories for queuing.

Fixes #6283
  • Loading branch information
ejona86 committed Jan 20, 2021
1 parent cd2c168 commit 7b8105e
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 116 deletions.
24 changes: 15 additions & 9 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -30,6 +30,7 @@
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -238,7 +239,13 @@ public final void shutdownNow(Status status) {
}
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) {
stream.cancel(status);
Runnable runnable = stream.setStream(new FailingClientStream(status, RpcProgress.REFUSED));
if (runnable != null) {
// Drain in-line instead of using an executor as failing stream just throws everything
// away. This is essentially the same behavior as DelayedStream.cancel() but can be done
// before stream.start().
runnable.run();
}
}
syncContext.execute(savedReportTransportTerminated);
}
Expand Down Expand Up @@ -294,12 +301,10 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
Runnable runnable = stream.createRealStream(transport);
if (runnable != null) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
}
Expand Down Expand Up @@ -346,7 +351,8 @@ private PendingStream(PickSubchannelArgs args) {
this.args = args;
}

private void createRealStream(ClientTransport transport) {
/** Runnable may be null. */
private Runnable createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
Expand All @@ -355,7 +361,7 @@ private void createRealStream(ClientTransport transport) {
} finally {
context.detach(origContext);
}
setStream(realStream);
return setStream(realStream);
}

@Override
Expand Down
126 changes: 78 additions & 48 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -59,38 +60,35 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
// No need to synchronize; start() synchronization provides a happens-before
private List<Runnable> preStartPendingCalls = new ArrayList<>();

@Override
public void setMaxInboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxInboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}

@Override
public void setMaxOutboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxOutboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}

@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
Expand All @@ -115,21 +113,41 @@ public void appendTimeoutInsight(InsightBuilder insight) {
}

/**
* Transfers all pending and future requests and mutations to the given stream.
* Transfers all pending and future requests and mutations to the given stream. Method will return
* quickly, but if the returned Runnable is non-null it must be called to complete the process.
* The Runnable may take a while to execute.
*
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
// When this method returns, passThrough is guaranteed to be true
final void setStream(ClientStream stream) {
// When this method returns, start() has been called on realStream or passThrough is guaranteed to
// be true
@CheckReturnValue
final Runnable setStream(ClientStream stream) {
ClientStreamListener savedListener;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
return null;
}
setRealStream(checkNotNull(stream, "stream"));
savedListener = listener;
if (savedListener == null) {
assert pendingCalls.isEmpty();
pendingCalls = null;
passThrough = true;
}
}
if (savedListener == null) {
return null;
} else {
internalStart(savedListener);
return new Runnable() {
@Override
public void run() {
drainPendingCalls();
}
};
}

drainPendingCalls();
}

/**
Expand Down Expand Up @@ -177,6 +195,7 @@ private void drainPendingCalls() {
* only if {@code runnable} is thread-safe.
*/
private void delayOrExecute(Runnable runnable) {
checkState(listener != null, "May only be called after start");
synchronized (this) {
if (!passThrough) {
pendingCalls.add(runnable);
Expand All @@ -190,7 +209,7 @@ private void delayOrExecute(Runnable runnable) {
public void setAuthority(final String authority) {
checkState(listener == null, "May only be called before start");
checkNotNull(authority, "authority");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setAuthority(authority);
Expand All @@ -200,18 +219,19 @@ public void run() {

@Override
public void start(ClientStreamListener listener) {
checkNotNull(listener, "listener");
checkState(this.listener == null, "already started");

Status savedError;
boolean savedPassThrough;
synchronized (this) {
this.listener = checkNotNull(listener, "listener");
// If error != null, then cancel() has been called and was unable to close the listener
savedError = error;
savedPassThrough = passThrough;
if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener);
}
this.listener = listener;
startTimeNanos = System.nanoTime();
}
if (savedError != null) {
Expand All @@ -220,16 +240,20 @@ public void start(ClientStreamListener listener) {
}

if (savedPassThrough) {
realStream.start(listener);
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
}
});
internalStart(listener);
} // else internalStart() will be called by setStream
}

/**
* Starts stream without synchronization. {@code listener} should be same instance as {@link
* #listener}.
*/
private void internalStart(ClientStreamListener listener) {
for (Runnable runnable : preStartPendingCalls) {
runnable.run();
}
preStartPendingCalls = null;
realStream.start(listener);
}

@Override
Expand All @@ -247,6 +271,7 @@ public Attributes getAttributes() {

@Override
public void writeMessage(final InputStream message) {
checkState(listener != null, "May only be called after start");
checkNotNull(message, "message");
if (passThrough) {
realStream.writeMessage(message);
Expand All @@ -262,6 +287,7 @@ public void run() {

@Override
public void flush() {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.flush();
} else {
Expand All @@ -277,16 +303,14 @@ public void run() {
// When this method returns, passThrough is guaranteed to be true
@Override
public void cancel(final Status reason) {
checkState(listener != null, "May only be called after start");
checkNotNull(reason, "reason");
boolean delegateToRealStream = true;
ClientStreamListener listenerToClose = null;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called
if (realStream == null) {
setRealStream(NoopClientStream.INSTANCE);
delegateToRealStream = false;
// If listener == null, then start() will later call listener with 'error'
listenerToClose = listener;
error = reason;
}
}
Expand All @@ -298,10 +322,9 @@ public void run() {
}
});
} else {
if (listenerToClose != null) {
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
// Note that listener is a DelayedStreamListener
listener.closed(reason, new Metadata());
}
}

Expand All @@ -314,6 +337,7 @@ private void setRealStream(ClientStream realStream) {

@Override
public void halfClose() {
checkState(listener != null, "May only be called after start");
delayOrExecute(new Runnable() {
@Override
public void run() {
Expand All @@ -324,6 +348,7 @@ public void run() {

@Override
public void request(final int numMessages) {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.request(numMessages);
} else {
Expand All @@ -338,7 +363,8 @@ public void run() {

@Override
public void optimizeForDirectExecutor() {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.optimizeForDirectExecutor();
Expand All @@ -348,8 +374,9 @@ public void run() {

@Override
public void setCompressor(final Compressor compressor) {
checkState(listener == null, "May only be called before start");
checkNotNull(compressor, "compressor");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setCompressor(compressor);
Expand All @@ -359,7 +386,8 @@ public void run() {

@Override
public void setFullStreamDecompression(final boolean fullStreamDecompression) {
delayOrExecute(
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(
new Runnable() {
@Override
public void run() {
Expand All @@ -370,8 +398,9 @@ public void run() {

@Override
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
checkState(listener == null, "May only be called before start");
checkNotNull(decompressorRegistry, "decompressorRegistry");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDecompressorRegistry(decompressorRegistry);
Expand All @@ -390,6 +419,7 @@ public boolean isReady() {

@Override
public void setMessageCompression(final boolean enable) {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.setMessageCompression(enable);
} else {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
Expand Up @@ -95,7 +95,11 @@ private void finalizeWith(ClientStream stream) {
// returnStream() has been called before me, thus delayedStream must have been
// created.
checkState(delayedStream != null, "delayedStream is null");
delayedStream.setStream(stream);
Runnable slow = delayedStream.setStream(stream);
if (slow != null) {
// TODO(ejona): run this on a separate thread
slow.run();
}
}

/**
Expand Down

0 comments on commit 7b8105e

Please sign in to comment.