Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check pending stream completion at delayed transport lifecycle #7720

Merged
merged 11 commits into from Dec 21, 2020
105 changes: 79 additions & 26 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -66,6 +66,23 @@ final class DelayedClientTransport implements ManagedClientTransport {
@Nonnull
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
@GuardedBy("lock")
private Collection<PendingStream> toCheckCompletionStreams = new LinkedHashSet<>();
private Runnable pollForStreamTransferCompletion = new Runnable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it final?

@Override
public void run() {
ArrayList<PendingStream> savedToCheckCompletionStreams;
synchronized (lock) {
savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need copy the list, just saving the reference should be sufficient.

if (!toCheckCompletionStreams.isEmpty()) {
toCheckCompletionStreams = Collections.emptyList();
}
}
for (final PendingStream stream : savedToCheckCompletionStreams) {
stream.awaitStreamTransferCompletion();
}
}
};

/**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
Expand Down Expand Up @@ -211,7 +228,7 @@ public void run() {
listener.transportShutdown(status);
}
});
if (!hasPendingStreams() && reportTransportTerminated != null) {
if (!hasPendingStreams() && !hasUncommittedStreams() && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand All @@ -227,19 +244,27 @@ public void run() {
public final void shutdownNow(Status status) {
shutdown(status);
Collection<PendingStream> savedPendingStreams;
Collection<PendingStream> savedToCheckCompletionStreams;
Runnable savedReportTransportTerminated;
synchronized (lock) {
savedPendingStreams = pendingStreams;
savedToCheckCompletionStreams = toCheckCompletionStreams;
savedReportTransportTerminated = reportTransportTerminated;
reportTransportTerminated = null;
if (!pendingStreams.isEmpty()) {
pendingStreams = Collections.emptyList();
}
if (!toCheckCompletionStreams.isEmpty()) {
toCheckCompletionStreams = Collections.emptyList();
}
}
if (savedReportTransportTerminated != null) {
for (PendingStream stream : savedPendingStreams) {
stream.cancel(status);
}
for (PendingStream stream : savedToCheckCompletionStreams) {
stream.awaitStreamTransferCompletion();
}
syncContext.execute(savedReportTransportTerminated);
}
// If savedReportTransportTerminated == null, transportTerminated() has already been called in
Expand All @@ -252,13 +277,26 @@ public final boolean hasPendingStreams() {
}
}

public final boolean hasUncommittedStreams() {
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
synchronized (lock) {
return !toCheckCompletionStreams.isEmpty();
}
}

@VisibleForTesting
final int getPendingStreamsCount() {
synchronized (lock) {
return pendingStreams.size();
}
}

@VisibleForTesting
final int getUncommittedStreamsCount() {
synchronized (lock) {
return toCheckCompletionStreams.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using the same name for toCheckCompletionStreams and uncommittedStreams?

}
}

/**
* Use the picker to try picking a transport for every pending stream, proceed the stream if the
* pick is successful, otherwise keep it pending.
Expand All @@ -270,48 +308,61 @@ final int getPendingStreamsCount() {
* <p>This method <strong>must not</strong> be called concurrently with itself.
*/
final void reprocess(@Nullable SubchannelPicker picker) {
ArrayList<PendingStream> toProcess;
ArrayList<PendingStream> toCreateRealStream;
ArrayList<PendingStream> toCheckCompletion;
synchronized (lock) {
lastPicker = picker;
lastPickerVersion++;
if (picker == null || !hasPendingStreams()) {
if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing this line is not necessary. Regardless of hasUncommittedStreams, if return then stream.createRealStream() or drain() will not be called and will not cause trouble.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is because it needs to take care another case: when shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination. (Similar to cancel() takes care of the last item and then trigger termination callback). This is different in shudownNow() which would take care of waiting uncommitted streams and then finalize termination.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hen shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination.

uncommittedStreams can be considered as existing RPCs, so shutdown() should not terminate them. uncommittedStreams will complete transfer by themselves, and they don't rely on a second reprocess().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() never terminates then existing RPCs. Do you mean we await() during shutdown()?
It seems it's hard to avoid that. Say, if there are both pendingStreams and uncommittedStreams when shutdown is called, so shutdown has to return. Then during next reprocess(), the newly created stream has not been drained , which would cause pendingStreams empty but uncommittedstream still has items in it, and we need a way to drain it. It looks reprocess() is that place, it would trigger in the next call or after idle timer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should await() during shutdown(), because there is no "next" reprocess() after shutdown, so we can not rely on reprocess() for existing uncommittedStreams.

Another way is introducing an abstract method DelayedStream.onTransferComplete(), and implementing DelayedClientTransport.PendingStream.onTransferComplete() to managed decrement of `uncommittedStreams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Indeed it appears that idle timeout is permanently cancelled immediately after shutdown is called so there is no next reprocess(). It looks like just awaitTransferCompletion() during shutdown is not enough, moreover, I believe generally we are not supposed to await() during shutdown.

return;
}
toProcess = new ArrayList<>(pendingStreams);
toCreateRealStream = new ArrayList<>(pendingStreams);
toCheckCompletion = new ArrayList<>(toCheckCompletionStreams);
}
ArrayList<PendingStream> toRemove = new ArrayList<>();
ArrayList<PendingStream> newlyCreated = new ArrayList<>();

for (final PendingStream stream : toProcess) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {

if (picker != null) {
for (final PendingStream stream : toCreateRealStream) {
PickResult pickResult = picker.pickSubchannel(stream.args);
CallOptions callOptions = stream.args.getCallOptions();
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
Executor executor = defaultAppExecutor;
// createRealStream may be expensive. It will start real streams on the transport. If
// there are pending requests, they will be serialized too, which may be expensive. Since
// we are now on transport thread, we need to offload the work to an executor.
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
toRemove.add(stream);
} // else: stay pending
newlyCreated.add(stream);
} // else: stay pending
}
}
toCheckCompletion.addAll(newlyCreated);
ArrayList<PendingStream> completed = new ArrayList<>();
for (final PendingStream stream : toCheckCompletion) {
if (stream.isStreamTransferCompleted()) {
completed.add(stream);
}
}

synchronized (lock) {
// Between this synchronized and the previous one:
// - Streams may have been cancelled, which may turn pendingStreams into emptiness.
// - shutdown() may be called, which may turn pendingStreams into null.
if (!hasPendingStreams()) {
// - shutdownNow() may be called, which may turn pendingStreams into emptiness.
if (!hasPendingStreams() && !hasUncommittedStreams()) {
return;
}
pendingStreams.removeAll(toRemove);
pendingStreams.removeAll(newlyCreated);
toCheckCompletionStreams.addAll(newlyCreated);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer making toCheckCompletionStreams be either 'non-final and immutable' or 'final and mutable', but not 'non-final' and 'mutable'.

toCheckCompletionStreams.removeAll(completed);
// Because delayed transport is long-lived, we take this opportunity to down-size the
// hashmap.
if (pendingStreams.isEmpty()) {
Expand All @@ -322,9 +373,10 @@ public void run() {
// transport starting streams and setting in-use state. During the gap the whole channel's
// in-use state may be false. However, it shouldn't cause spurious switching to idleness
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (30 millis).
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(pollForStreamTransferCompletion);
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down Expand Up @@ -367,6 +419,7 @@ public void cancel(Status reason) {
if (!hasPendingStreams() && justRemovedAnElement) {
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) {
syncContext.executeLater(pollForStreamTransferCompletion);
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -29,6 +29,8 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -59,6 +61,7 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
private final CountDownLatch realStreamStarted = new CountDownLatch(1);

@Override
public void setMaxInboundMessageSize(final int maxSize) {
Expand Down Expand Up @@ -132,6 +135,24 @@ final void setStream(ClientStream stream) {
drainPendingCalls();
}

protected boolean isStreamTransferCompleted() {
return realStreamStarted.getCount() == 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that getCount() is typically used for debugging and testing purposes. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html#getCount()

Better avoid it in the main source if possible.

}

protected void awaitStreamTransferCompletion() {
// Wait until accepted RPCs transfer to the real stream so that we can properly cancel or
// shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636.
boolean delegationComplete;
try {
delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
delegationComplete = false;
}
if (!delegationComplete) {
Thread.currentThread().interrupt();
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Called to transition {@code passThrough} to {@code true}. This method is not safe to be called
* multiple times; the caller must ensure it will only be called once, ever. {@code this} lock
Expand Down Expand Up @@ -221,12 +242,14 @@ public void start(ClientStreamListener listener) {

if (savedPassThrough) {
realStream.start(listener);
realStreamStarted.countDown();
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
realStreamStarted.countDown();
}
});
}
Expand Down Expand Up @@ -302,7 +325,11 @@ public void run() {
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
if (!isStreamTransferCompleted()) {
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
realStreamStarted.countDown();
}
}
awaitStreamTransferCompletion();
voidzcy marked this conversation as resolved.
Show resolved Hide resolved
}

@GuardedBy("this")
Expand Down