Skip to content

Commit

Permalink
core: DelayedStream should start() real stream immediately
Browse files Browse the repository at this point in the history
Fixes grpc#6283
  • Loading branch information
ejona86 committed Dec 21, 2020
1 parent ec70b64 commit 655fef2
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 61 deletions.
15 changes: 7 additions & 8 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -294,12 +294,10 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
Runnable runnable = stream.createRealStream(transport);
if (runnable != null) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
}
Expand Down Expand Up @@ -346,7 +344,8 @@ private PendingStream(PickSubchannelArgs args) {
this.args = args;
}

private void createRealStream(ClientTransport transport) {
/** Runnable may be null. */
private Runnable createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
Expand All @@ -355,7 +354,7 @@ private void createRealStream(ClientTransport transport) {
} finally {
context.detach(origContext);
}
setStream(realStream);
return setStream(realStream);
}

@Override
Expand Down
72 changes: 50 additions & 22 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -59,6 +60,8 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
// No need to synchronize; start() synchronization provides a happens-before
private List<Runnable> preStartPendingCalls = new ArrayList<>();

@Override
public void setMaxInboundMessageSize(final int maxSize) {
Expand Down Expand Up @@ -90,7 +93,8 @@ public void run() {

@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
Expand All @@ -115,21 +119,35 @@ public void appendTimeoutInsight(InsightBuilder insight) {
}

/**
* Transfers all pending and future requests and mutations to the given stream.
* Transfers all pending and future requests and mutations to the given stream. Method will return
* quickly, but if the returned Runnable is non-null it must be called to complete the process.
* The Runnable may take a while to execute.
*
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
// When this method returns, passThrough is guaranteed to be true
final void setStream(ClientStream stream) {
// When this method returns, start() has been called on realStream or passThrough is guaranteed to
// be true
@CheckReturnValue
final Runnable setStream(ClientStream stream) {
ClientStreamListener savedListener;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
return null;
}
setRealStream(checkNotNull(stream, "stream"));
savedListener = listener;
}
if (savedListener != null) {
internalStart(savedListener);
}

drainPendingCalls();
return new Runnable() {
@Override
public void run() {
drainPendingCalls();
}
};
}

/**
Expand Down Expand Up @@ -190,7 +208,7 @@ private void delayOrExecute(Runnable runnable) {
public void setAuthority(final String authority) {
checkState(listener == null, "May only be called before start");
checkNotNull(authority, "authority");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setAuthority(authority);
Expand All @@ -200,18 +218,19 @@ public void run() {

@Override
public void start(ClientStreamListener listener) {
checkNotNull(listener, "listener");
checkState(this.listener == null, "already started");

Status savedError;
boolean savedPassThrough;
synchronized (this) {
this.listener = checkNotNull(listener, "listener");
// If error != null, then cancel() has been called and was unable to close the listener
savedError = error;
savedPassThrough = passThrough;
if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener);
}
this.listener = listener;
startTimeNanos = System.nanoTime();
}
if (savedError != null) {
Expand All @@ -220,16 +239,20 @@ public void start(ClientStreamListener listener) {
}

if (savedPassThrough) {
realStream.start(listener);
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
}
});
internalStart(listener);
} // else internalStart() will be called by setStream
}

/**
* Starts stream without synchronization. {@code listener} should be same instance as {@link
* #listener}.
*/
private void internalStart(ClientStreamListener listener) {
for (Runnable runnable : preStartPendingCalls) {
runnable.run();
}
preStartPendingCalls = null;
realStream.start(listener);
}

@Override
Expand Down Expand Up @@ -298,10 +321,11 @@ public void run() {
}
});
} else {
drainPendingCalls();
if (listenerToClose != null) {
// Note that listenerToClose is a DelayedStreamListener
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
}
}

Expand Down Expand Up @@ -338,7 +362,8 @@ public void run() {

@Override
public void optimizeForDirectExecutor() {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.optimizeForDirectExecutor();
Expand All @@ -348,8 +373,9 @@ public void run() {

@Override
public void setCompressor(final Compressor compressor) {
checkState(listener == null, "May only be called before start");
checkNotNull(compressor, "compressor");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setCompressor(compressor);
Expand All @@ -359,7 +385,8 @@ public void run() {

@Override
public void setFullStreamDecompression(final boolean fullStreamDecompression) {
delayOrExecute(
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(
new Runnable() {
@Override
public void run() {
Expand All @@ -370,8 +397,9 @@ public void run() {

@Override
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
checkState(listener == null, "May only be called before start");
checkNotNull(decompressorRegistry, "decompressorRegistry");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDecompressorRegistry(decompressorRegistry);
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
Expand Up @@ -95,7 +95,11 @@ private void finalizeWith(ClientStream stream) {
// returnStream() has been called before me, thus delayedStream must have been
// created.
checkState(delayedStream != null, "delayedStream is null");
delayedStream.setStream(stream);
Runnable slow = delayedStream.setStream(stream);
if (slow != null) {
// TODO(ejona): run this on a separate thread
slow.run();
}
}

/**
Expand Down
Expand Up @@ -315,6 +315,8 @@ public void uncaughtException(Thread t, Throwable e) {
// Fail-fast streams
DelayedStream ff1 = (DelayedStream) delayedTransport.newStream(
method, headers, failFastCallOptions);
ff1.start(mock(ClientStreamListener.class));
ff1.halfClose();
PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers,
failFastCallOptions);
verify(transportListener).transportInUse(true);
Expand Down Expand Up @@ -345,6 +347,7 @@ public void uncaughtException(Thread t, Throwable e) {
wfr3Executor.getScheduledExecutorService());
DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream(
method, headers, wfr3callOptions);
wfr3.halfClose();
PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers,
wfr3callOptions);
DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream(
Expand Down Expand Up @@ -380,18 +383,22 @@ public void uncaughtException(Thread t, Throwable e) {
inOrder.verify(picker).pickSubchannel(wfr4args);

inOrder.verifyNoMoreInteractions();
// Make sure that real transport creates streams in the executor
verify(mockRealTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(mockRealTransport2, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
// Make sure that streams are created and started immediately, not in any executor. This is
// necessary during shut down to guarantee that when DelayedClientTransport terminates, all
// streams are now owned by a real transport (which should prevent the Channel from
// terminating).
// ff1 and wfr1 went through
verify(mockRealTransport).newStream(method, headers, failFastCallOptions);
verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions);
assertSame(mockRealStream, ff1.getRealStream());
assertSame(mockRealStream2, wfr1.getRealStream());
verify(mockRealStream).start(any(ClientStreamListener.class));
// But also verify that non-start()-related calls are run within the Executor, since they may be
// slow.
verify(mockRealStream, never()).halfClose();
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
verify(mockRealStream).halfClose();
// The ff2 has failed due to picker returning an error
assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError());
// Other streams are still buffered
Expand Down Expand Up @@ -430,10 +437,11 @@ public void uncaughtException(Thread t, Throwable e) {
assertSame(mockRealStream2, wfr2.getRealStream());
assertSame(mockRealStream2, wfr4.getRealStream());

assertSame(mockRealStream, wfr3.getRealStream());
// If there is an executor in the CallOptions, it will be used to create the real stream.
assertNull(wfr3.getRealStream());
verify(mockRealStream, times(1)).halfClose(); // 1 for ff1
wfr3Executor.runDueTasks();
assertSame(mockRealStream, wfr3.getRealStream());
verify(mockRealStream, times(2)).halfClose();

// New streams will use the last picker
DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream(
Expand Down

0 comments on commit 655fef2

Please sign in to comment.