Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: delay CallCredentialsApplyingTransport shutdown until metadataApplier finalized #7813

Merged
merged 8 commits into from Jan 26, 2021
Expand Up @@ -71,7 +71,7 @@ private class CallCredentialsApplyingTransport extends ForwardingConnectionClien
private final String authority;
// Negative value means transport active, non-negative value indicates shutdown invoked.
private final AtomicInteger pendingApplier = new AtomicInteger(Integer.MIN_VALUE + 1);
private final Status shutdownStatus = Status.UNAVAILABLE;
private volatile Status shutdownStatus;
@GuardedBy("this")
private Status savedShutdownStatus;
@GuardedBy("this")
Expand Down Expand Up @@ -157,7 +157,10 @@ public void shutdown(Status status) {
checkNotNull(status, "status");
synchronized (this) {
if (pendingApplier.get() < 0) {
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
shutdownStatus = status;
pendingApplier.addAndGet(Integer.MAX_VALUE);
} else {
return;
}
if (pendingApplier.get() != 0) {
savedShutdownStatus = status;
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -167,16 +170,23 @@ public void shutdown(Status status) {
super.shutdown(status);
}

// TODO(zivy@): should call delegate shutdownNow asap. Maybe cancel pending applier.
// TODO(zivy): cancel pending applier here.
@Override
public void shutdownNow(Status status) {
checkNotNull(status, "status");
synchronized (this) {
if (pendingApplier.get() < 0) {
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
shutdownStatus = status;
if (savedShutdownStatus == null) {
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
savedShutdownStatus = status;
}
pendingApplier.addAndGet(Integer.MAX_VALUE);
} else if (savedShutdownNowStatus != null) {
return;
}
if (pendingApplier.get() != 0) {
savedShutdownNowStatus = status;
// TODO(zivy): propagate shutdownNow to the delegate immediately.
return;
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
Expand Up @@ -296,6 +296,28 @@ public void delayedShutdown_shutdownThenApplyThenShutdownNow() {
verify(mockTransport).shutdown(Status.UNAVAILABLE);
transport.shutdownNow(Status.ABORTED);
verify(mockTransport).shutdownNow(Status.ABORTED);

transport.shutdown(Status.UNAVAILABLE);
verify(mockTransport).shutdown(Status.UNAVAILABLE);
transport.shutdownNow(Status.ABORTED);
verify(mockTransport, times(2)).shutdownNow(Status.ABORTED);
}

@Test
public void delayedShutdownNow() {
transport.newStream(method, origHeaders, callOptions);
ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(any(RequestInfo.class),
same(mockExecutor), applierCaptor.capture());
transport.shutdownNow(Status.ABORTED);
assertTrue(transport.newStream(method, origHeaders, callOptions)
instanceof FailingClientStream);
verify(mockTransport, never()).shutdown(any(Status.class));
Metadata headers = new Metadata();
headers.put(CREDS_KEY, CREDS_VALUE);
applierCaptor.getValue().apply(headers);
verify(mockTransport).shutdown(Status.ABORTED);
verify(mockTransport).shutdownNow(Status.ABORTED);
}

@Test
Expand Down