Skip to content

Commit

Permalink
WORKING tests are hanging
Browse files Browse the repository at this point in the history
Fixes grpc#6283
  • Loading branch information
ejona86 committed Dec 17, 2020
1 parent ec70b64 commit 25bf8cb
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 60 deletions.
15 changes: 7 additions & 8 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -294,12 +294,10 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
Runnable runnable = stream.createRealStream(transport);
if (runnable != null) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
}
Expand Down Expand Up @@ -346,7 +344,8 @@ private PendingStream(PickSubchannelArgs args) {
this.args = args;
}

private void createRealStream(ClientTransport transport) {
/** Runnable may be null. */
private Runnable createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
Expand All @@ -355,7 +354,7 @@ private void createRealStream(ClientTransport transport) {
} finally {
context.detach(origContext);
}
setStream(realStream);
return setStream(realStream);
}

@Override
Expand Down
66 changes: 44 additions & 22 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -60,6 +61,12 @@ class DelayedStream implements ClientStream {
@GuardedBy("this")
private long streamSetTimeNanos;

// start()-related storage. All references should be cleared after start() is called on realStream
// to avoid retaining the objects for the life of the call
/** non-null when setAuthority needs to be called on realStream when it is set. */
// No need to synchronize; start() synchronization provides a happens-before
private String startAuthority;

@Override
public void setMaxInboundMessageSize(final int maxSize) {
if (passThrough) {
Expand Down Expand Up @@ -115,21 +122,35 @@ public void appendTimeoutInsight(InsightBuilder insight) {
}

/**
* Transfers all pending and future requests and mutations to the given stream.
* Transfers all pending and future requests and mutations to the given stream. Method will return
* quickly, but if the returned Runnable is non-null it must be called to complete the process.
* The Runnable may take a while to execute.
*
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
// When this method returns, passThrough is guaranteed to be true
final void setStream(ClientStream stream) {
// When this method returns, start() has been called on realStream or passThrough is guaranteed to
// be true
@CheckReturnValue
final Runnable setStream(ClientStream stream) {
ClientStreamListener savedListener;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
return null;
}
setRealStream(checkNotNull(stream, "stream"));
savedListener = listener;
}
if (savedListener != null) {
internalStart(savedListener);
}

drainPendingCalls();
return new Runnable() {
@Override
public void run() {
drainPendingCalls();
}
};
}

/**
Expand Down Expand Up @@ -190,28 +211,24 @@ private void delayOrExecute(Runnable runnable) {
public void setAuthority(final String authority) {
checkState(listener == null, "May only be called before start");
checkNotNull(authority, "authority");
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setAuthority(authority);
}
});
this.startAuthority = authority;
}

@Override
public void start(ClientStreamListener listener) {
checkNotNull(listener, "listener");
checkState(this.listener == null, "already started");

Status savedError;
boolean savedPassThrough;
synchronized (this) {
this.listener = checkNotNull(listener, "listener");
// If error != null, then cancel() has been called and was unable to close the listener
savedError = error;
savedPassThrough = passThrough;
if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener);
}
this.listener = listener;
startTimeNanos = System.nanoTime();
}
if (savedError != null) {
Expand All @@ -220,16 +237,20 @@ public void start(ClientStreamListener listener) {
}

if (savedPassThrough) {
realStream.start(listener);
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
}
});
internalStart(listener);
} // else internalStart() will be called by setStream
}

/**
* Starts stream without synchronization. {@code listener} should be same instance as {@link
* #listener}.
*/
private void internalStart(ClientStreamListener listener) {
if (startAuthority != null) {
realStream.setAuthority(startAuthority);
startAuthority = null;
}
realStream.start(listener);
}

@Override
Expand Down Expand Up @@ -298,10 +319,11 @@ public void run() {
}
});
} else {
drainPendingCalls();
if (listenerToClose != null) {
// Note that listenerToClose is a DelayedStreamListener
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
}
}

Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
Expand Up @@ -95,7 +95,11 @@ private void finalizeWith(ClientStream stream) {
// returnStream() has been called before me, thus delayedStream must have been
// created.
checkState(delayedStream != null, "delayedStream is null");
delayedStream.setStream(stream);
Runnable slow = delayedStream.setStream(stream);
if (slow != null) {
// TODO(ejona): run this on a separate thread
slow.run();
}
}

/**
Expand Down
Expand Up @@ -315,6 +315,8 @@ public void uncaughtException(Thread t, Throwable e) {
// Fail-fast streams
DelayedStream ff1 = (DelayedStream) delayedTransport.newStream(
method, headers, failFastCallOptions);
ff1.start(mock(ClientStreamListener.class));
ff1.halfClose();
PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers,
failFastCallOptions);
verify(transportListener).transportInUse(true);
Expand Down Expand Up @@ -345,6 +347,7 @@ public void uncaughtException(Thread t, Throwable e) {
wfr3Executor.getScheduledExecutorService());
DelayedStream wfr3 = (DelayedStream) delayedTransport.newStream(
method, headers, wfr3callOptions);
wfr3.halfClose();
PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers,
wfr3callOptions);
DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream(
Expand Down Expand Up @@ -380,18 +383,22 @@ public void uncaughtException(Thread t, Throwable e) {
inOrder.verify(picker).pickSubchannel(wfr4args);

inOrder.verifyNoMoreInteractions();
// Make sure that real transport creates streams in the executor
verify(mockRealTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(mockRealTransport2, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
// Make sure that streams are created and started immediately, not in any executor. This is
// necessary during shut down to guarantee that when DelayedClientTransport terminates, all
// streams are now owned by a real transport (which should prevent the Channel from
// terminating).
// ff1 and wfr1 went through
verify(mockRealTransport).newStream(method, headers, failFastCallOptions);
verify(mockRealTransport2).newStream(method, headers, waitForReadyCallOptions);
assertSame(mockRealStream, ff1.getRealStream());
assertSame(mockRealStream2, wfr1.getRealStream());
verify(mockRealStream).start(any(ClientStreamListener.class));
// But also verify that non-start()-related calls are run within the Executor, since they may be
// slow.
verify(mockRealStream, never()).halfClose();
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
verify(mockRealStream).halfClose();
// The ff2 has failed due to picker returning an error
assertSame(Status.UNAVAILABLE, ((FailingClientStream) ff2.getRealStream()).getError());
// Other streams are still buffered
Expand Down Expand Up @@ -430,10 +437,11 @@ public void uncaughtException(Thread t, Throwable e) {
assertSame(mockRealStream2, wfr2.getRealStream());
assertSame(mockRealStream2, wfr4.getRealStream());

assertSame(mockRealStream, wfr3.getRealStream());
// If there is an executor in the CallOptions, it will be used to create the real stream.
assertNull(wfr3.getRealStream());
verify(mockRealStream, times(1)).halfClose(); // 1 for ff1
wfr3Executor.runDueTasks();
assertSame(mockRealStream, wfr3.getRealStream());
verify(mockRealStream, times(2)).halfClose();

// New streams will use the last picker
DelayedStream wfr5 = (DelayedStream) delayedTransport.newStream(
Expand Down

0 comments on commit 25bf8cb

Please sign in to comment.