Skip to content

Commit

Permalink
servlet: force always sending end_stream in trailer frame (Fixes #10124)
Browse files Browse the repository at this point in the history
Signed-off-by: hypnoce <hypnoce@donarproject.org>
  • Loading branch information
hypnoce committed May 3, 2023
1 parent fbc8679 commit 0d0d716
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1098,11 +1098,11 @@ public void earlyServerClose_noServerHeaders() throws Exception {
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
checkClientStatus(status, clientStreamStatus);
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
String.join(",", trailers.getAll(asciiKey)),
String.join(",", clientStreamTrailers.getAll(asciiKey)));
assertEquals(
Lists.newArrayList(trailers.getAll(binaryKey)),
Lists.newArrayList(clientStreamTrailers.getAll(binaryKey)));
String.join(",", trailers.getAll(binaryKey)),
String.join(",", clientStreamTrailers.getAll(binaryKey)));
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void start(ServerListener listener) throws IOException {
listener.transportCreated(new ServletServerBuilder.ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories,
Integer.MAX_VALUE);
Integer.MAX_VALUE, false);
GrpcServlet grpcServlet = new GrpcServlet(adapter);

jettyServer = new Server(0);
Expand Down
8 changes: 6 additions & 2 deletions servlet/src/main/java/io/grpc/servlet/ServletAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,17 @@ public final class ServletAdapter {
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final int maxInboundMessageSize;
private final Attributes attributes;
private final boolean forceTrailers;

ServletAdapter(
ServerTransportListener transportListener,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
int maxInboundMessageSize) {
int maxInboundMessageSize,
boolean forceTrailers) {
this.transportListener = transportListener;
this.streamTracerFactories = streamTracerFactories;
this.maxInboundMessageSize = maxInboundMessageSize;
this.forceTrailers = forceTrailers;
attributes = transportListener.transportReady(Attributes.EMPTY);
}

Expand Down Expand Up @@ -148,7 +151,8 @@ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOEx
new InetSocketAddress(req.getLocalAddr(), req.getLocalPort()))
.build(),
getAuthority(req),
logId);
logId,
forceTrailers);

transportListener.streamCreated(stream, method, headers);
stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);
Expand Down
17 changes: 16 additions & 1 deletion servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class ServletServerBuilder extends ForwardingServerBuilder<ServletS
private boolean internalCaller;
private boolean usingCustomScheduler;
private InternalServerImpl internalServer;
private boolean forceTrailers;

public ServletServerBuilder() {
serverImplBuilder = new ServerImplBuilder(this::buildTransportServers);
Expand All @@ -98,7 +99,8 @@ public Server build() {
* Creates a {@link ServletAdapter}.
*/
public ServletAdapter buildServletAdapter() {
return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize);
return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize,
forceTrailers);
}

private ServerTransportListener buildAndStart() {
Expand Down Expand Up @@ -176,6 +178,19 @@ public ServletServerBuilder maxInboundMessageSize(int bytes) {
return this;
}

/**
* Some servlet containers don't support sending trailers only (Tomcat).
* They send an empty data frame with an end_stream flag.
* This is not supported by gRPC as is expects end_stream flag in trailer or trailer-only frame
* To avoid this empty data frame, force the servlet container to either
* - send a header frame, an empty data frame and a trailer frame with end_stream (Tomcat)
* - send a header frame and a trailer frame with end_stream (Jetty, Undertow)
*/
public ServletServerBuilder forceTrailers(boolean forceTrailers) {
this.forceTrailers = forceTrailers;
return this;

Check warning on line 191 in servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java

View check run for this annotation

Codecov / codecov/patch

servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java#L190-L191

Added lines #L190 - L191 were not covered by tests
}

/**
* Provides a custom scheduled executor service to the server builder.
*
Expand Down
32 changes: 22 additions & 10 deletions servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,16 @@ final class ServletServerStream extends AbstractServerStream {
private final String authority;
private final InternalLogId logId;
private final AsyncServletOutputStreamWriter writer;
private final boolean forceTrailers;

ServletServerStream(
AsyncContext asyncCtx,
StatsTraceContext statsTraceCtx,
int maxInboundMessageSize,
Attributes attributes,
String authority,
InternalLogId logId) throws IOException {
InternalLogId logId,
boolean forceTrailers) throws IOException {
super(ByteArrayWritableBuffer::new, statsTraceCtx);
transportState =
new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
Expand All @@ -82,6 +84,7 @@ final class ServletServerStream extends AbstractServerStream {
this.resp = (HttpServletResponse) asyncCtx.getResponse();
this.writer = new AsyncServletOutputStreamWriter(
asyncCtx, transportState, logId);
this.forceTrailers = forceTrailers;
resp.getOutputStream().setWriteListener(new GrpcWriteListener());
}

Expand Down Expand Up @@ -276,20 +279,29 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
new Object[] {logId, trailers, headersSent, status});
}
if (!headersSent) {
writeHeadersToServletResponse(trailers);
} else {
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
trailerSupplier.get().putIfAbsent(key, newValue);
if (forceTrailers) {
writeHeadersToServletResponse(new Metadata());
resp.setTrailerFields(trailerSupplier);
serializeTrailers(trailers);
} else {
writeHeadersToServletResponse(trailers);
}
} else {
serializeTrailers(trailers);
}

writer.complete();
}

private void serializeTrailers(Metadata trailers) {
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
trailerSupplier.get().putIfAbsent(key, newValue);
}
}

@Override
public void cancel(Status status) {
if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public void start(ServerListener listener) throws IOException {
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE);
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE,
true);
GrpcServlet grpcServlet = new GrpcServlet(adapter);

tomcatServer = new Tomcat();
Expand Down Expand Up @@ -248,21 +249,6 @@ public void interactionsAfterClientStreamCancelAreNoops() {}
@Test
public void clientCancel() {}

@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_noServerHeaders() {}

@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_serverFailure() {}

@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() {}

@Override
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void start(ServerListener listener) throws IOException {
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE);
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE,
false);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
InstanceFactory<? extends Servlet> instanceFactory =
() -> new ImmediateInstanceHandle<>(grpcServlet);
Expand Down

0 comments on commit 0d0d716

Please sign in to comment.