-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: Do not leak server state when application callbacks throw exceptions #3064
Changes from 2 commits
a1db4c6
d0a2ea3
ab44d1f
027e994
65e4d61
b65e09f
0514d6a
7eb4eab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
package io.grpc; | ||
|
||
import com.google.common.base.Preconditions; | ||
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; | ||
import io.grpc.internal.SynchronizedServerCall; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't have references to internal from io.grpc. I guess move SynchronizedServerCall to io.grpc and make it package-private? Or maybe put it in ForwardingServerCall (and still package-private)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved to io.grpc.util |
||
import java.io.BufferedInputStream; | ||
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
|
@@ -288,4 +290,81 @@ public void onMessage(WReqT message) { | |
} | ||
}; | ||
} | ||
|
||
/** | ||
* A class that intercepts uncaught exceptions of type {@link StatusRuntimeException} and handles | ||
* them by closing the {@link ServerCall}, and transmitting the exception's details to the client. | ||
* | ||
* <p>Without this interceptor, gRPC will strip all details and close the {@link ServerCall} with | ||
* a generic {@link Status#UNKNOWN} code. | ||
* | ||
* <p>Security warning: the {@link Status} and {@link Metadata} may contain sensitive server-side | ||
* state information, and generally should not be sent to clients. Only install this interceptor | ||
* if all clients are trusted. | ||
*/ | ||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2189") | ||
public static final ServerInterceptor STATUSRUNTIMEXCEPTION_TRANSMITTING_INTERCEPTOR = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not core API, but rather something provided for convenience. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved StatusRuntimeExceptionTransmitter to its own class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Expose this as a method, not as a static field. That allows us, in the future, to avoid creating it until first use or similar. Android in particular is hurt quite a bit by static initialization since apps start frequently and want really low memory usage. So while it's fine not to worry about that now, we shouldn't have the API dictate the lifetime. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
new ServerInterceptor() { | ||
@Override | ||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( | ||
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { | ||
final ServerCall<ReqT, RespT> syncServerCall = | ||
new SynchronizedServerCall<ReqT, RespT>(call); | ||
ServerCall.Listener<ReqT> listener = next.startCall(syncServerCall, headers); | ||
return new SimpleForwardingServerCallListener<ReqT>(listener) { | ||
@Override | ||
public void onMessage(ReqT message) { | ||
try { | ||
super.onMessage(message); | ||
} catch (StatusRuntimeException t) { | ||
closeWithException(t); | ||
} | ||
} | ||
|
||
@Override | ||
public void onHalfClose() { | ||
try { | ||
super.onHalfClose(); | ||
} catch (StatusRuntimeException t) { | ||
closeWithException(t); | ||
} | ||
} | ||
|
||
@Override | ||
public void onCancel() { | ||
try { | ||
super.onCancel(); | ||
} catch (StatusRuntimeException t) { | ||
closeWithException(t); | ||
} | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
try { | ||
super.onComplete(); | ||
} catch (StatusRuntimeException t) { | ||
closeWithException(t); | ||
} | ||
} | ||
|
||
@Override | ||
public void onReady() { | ||
try { | ||
super.onReady(); | ||
} catch (StatusRuntimeException t) { | ||
closeWithException(t); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to analyze the locking order with in-process transport.
On the inbound path, So, this change doesn't impose a deadline scenario. However, if we (or the user) did the same thing on the client side with a @ejona86 is this a valid concern? Should we try to reduce locking, e.g., call callbacks outside of lock in in-process transport? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Per offline discussion, the deadlock issue with in-process transport is legit, while not the fault of this PR. I have filed #3084 However, even without #3084, using
This time, it's the two synchronized calls that are involved in the deadlock. This can be considered application-level code, and if application does that and got deadlock, it's the application's fault. Application shouldn't do it, and it means we should not do it either. Like the proposed solution for #3084, we could also use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reimplemented the functionality via |
||
} | ||
} | ||
|
||
private void closeWithException(StatusRuntimeException t) { | ||
Metadata metadata = Status.trailersFromThrowable(t); | ||
if (metadata == null) { | ||
metadata = new Metadata(); | ||
} | ||
syncServerCall.close(Status.fromThrowable(t), metadata); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
} | ||
}; | ||
} | ||
}; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright 2017, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.grpc.internal; | ||
|
||
import io.grpc.Attributes; | ||
import io.grpc.ForwardingServerCall; | ||
import io.grpc.Metadata; | ||
import io.grpc.ServerCall; | ||
import io.grpc.Status; | ||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* A {@link ServerCall} that wraps around a non thread safe delegate and provides thread safe | ||
* access. | ||
*/ | ||
public class SynchronizedServerCall<ReqT, RespT> extends | ||
ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> { | ||
|
||
public SynchronizedServerCall(ServerCall<ReqT, RespT> delegate) { | ||
super(delegate); | ||
} | ||
|
||
@Override | ||
public synchronized void sendMessage(RespT message) { | ||
super.sendMessage(message); | ||
} | ||
|
||
@Override | ||
public synchronized void request(int numMessages) { | ||
super.request(numMessages); | ||
} | ||
|
||
@Override | ||
public synchronized void sendHeaders(Metadata headers) { | ||
super.sendHeaders(headers); | ||
} | ||
|
||
@Override | ||
public synchronized boolean isReady() { | ||
return super.isReady(); | ||
} | ||
|
||
@Override | ||
public synchronized void close(Status status, Metadata trailers) { | ||
super.close(status, trailers); | ||
} | ||
|
||
@Override | ||
public synchronized boolean isCancelled() { | ||
return super.isCancelled(); | ||
} | ||
|
||
@Override | ||
public synchronized void setMessageCompression(boolean enabled) { | ||
super.setMessageCompression(enabled); | ||
} | ||
|
||
@Override | ||
public synchronized void setCompression(String compressor) { | ||
super.setCompression(compressor); | ||
} | ||
|
||
@Override | ||
public synchronized Attributes getAttributes() { | ||
return super.getAttributes(); | ||
} | ||
|
||
@Nullable | ||
@Override | ||
public synchronized String getAuthority() { | ||
return super.getAuthority(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,10 @@ | |
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertSame; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.junit.Assert.fail; | ||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Matchers.same; | ||
import static org.mockito.Mockito.doThrow; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
|
@@ -68,6 +71,8 @@ public class ServerInterceptorsTest { | |
|
||
private final Metadata headers = new Metadata(); | ||
|
||
private boolean allowListenerInteraction; | ||
|
||
/** Set up for test. */ | ||
@Before | ||
public void setUp() { | ||
|
@@ -85,14 +90,17 @@ public void setUp() { | |
|
||
serviceDefinition = ServerServiceDefinition.builder(new ServiceDescriptor("basic", flowMethod)) | ||
.addMethod(flowMethod, handler).build(); | ||
allowListenerInteraction = false; | ||
} | ||
|
||
/** Final checks for all tests. */ | ||
@After | ||
public void makeSureExpectedMocksUnused() { | ||
verifyZeroInteractions(requestMarshaller); | ||
verifyZeroInteractions(responseMarshaller); | ||
verifyZeroInteractions(listener); | ||
if (!allowListenerInteraction) { | ||
verifyZeroInteractions(listener); | ||
} | ||
} | ||
|
||
@Test(expected = NullPointerException.class) | ||
|
@@ -407,6 +415,36 @@ public void onMessage(ReqT message) { | |
order); | ||
} | ||
|
||
@Test | ||
public void statusRuntimeExceptionTransmittingInterceptor() { | ||
allowListenerInteraction = true; | ||
final Status expectedStatus = Status.UNAVAILABLE; | ||
final Metadata expectedMetadata = new Metadata(); | ||
call = Mockito.spy(call); | ||
final StatusRuntimeException exception = | ||
new StatusRuntimeException(expectedStatus, expectedMetadata); | ||
doThrow(exception).when(listener).onMessage(any(String.class)); | ||
doThrow(exception).when(listener).onCancel(); | ||
doThrow(exception).when(listener).onComplete(); | ||
doThrow(exception).when(listener).onHalfClose(); | ||
doThrow(exception).when(listener).onReady(); | ||
|
||
ServerServiceDefinition intercepted = ServerInterceptors.intercept( | ||
serviceDefinition, | ||
Arrays.asList(ServerInterceptors.STATUSRUNTIMEXCEPTION_TRANSMITTING_INTERCEPTOR)); | ||
try { | ||
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onMessage("hello"); | ||
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onCancel(); | ||
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onComplete(); | ||
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onHalfClose(); | ||
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers).onReady(); | ||
} catch (Throwable t) { | ||
fail("The interceptor should have handled the error by directly closing the ServerCall, " | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This throws away There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
+ "and should not propagate it to the method's caller."); | ||
} | ||
verify(call, times(5)).close(same(expectedStatus), same(expectedMetadata)); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private static ServerMethodDefinition<String, Integer> getSoleMethod( | ||
ServerServiceDefinition serviceDef) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io.grpc
should not referenceio.grpc.internal
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved StatusRuntimeExceptionTransmitter to its own class under io.grpc.util