Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

servlet: force always sending end_stream in trailer frame (Fixes #10124) #10125

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.servlet;

import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;

/**
* Some servlet containers don't support sending trailers only (Tomcat).
* They send an empty data frame with an end_stream flag.
* This is not supported by gRPC as is expects end_stream flag in trailer or trailer-only frame
* To avoid this empty data frame, use this interceptor to force the servlet container to either
* - send a header frame, an empty data frame and a trailer frame with end_stream (Tomcat)
* - send a header frame and a trailer frame with end_stream (Jetty, Undertow)
* This interceptor is added when forcing trailers in the server builder
* {@link ServletServerBuilder#forceTrailers(boolean)}
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10124")
public class ForceTrailersServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ForceTrailersServerCall<ReqT, RespT> interceptedCall = new ForceTrailersServerCall<>(call);
try {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(interceptedCall, headers)) {
@Override
public void onMessage(ReqT message) {
try {
super.onMessage(message);
} catch (Throwable t) {
sendHeaders();
throw t;
}
}

@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (Throwable t) {
sendHeaders();
throw t;
}
}

@Override
public void onCancel() {
try {
super.onCancel();
} catch (Throwable t) {
sendHeaders();
throw t;
}
}

@Override
public void onComplete() {
try {
super.onComplete();
} catch (Throwable t) {
sendHeaders();
throw t;
}
}

@Override
public void onReady() {
try {
super.onReady();
} catch (Throwable t) {
sendHeaders();
throw t;
}
}

private void sendHeaders() {
interceptedCall.maybeSendEmptyHeaders();
}
};
} catch (RuntimeException e) {
interceptedCall.maybeSendEmptyHeaders();
throw e;
}
}

static class ForceTrailersServerCall<ReqT, RespT> extends
ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {

private volatile boolean headersSent = false;

ForceTrailersServerCall(ServerCall<ReqT, RespT> delegate) {
super(delegate);
}

void maybeSendEmptyHeaders() {
if (!headersSent) {
this.sendHeaders(new Metadata());
}
}

@Override
public void sendHeaders(Metadata headers) {
headersSent = true;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
super.sendHeaders(headers);
}

@Override
public void close(Status status, Metadata trailers) {
maybeSendEmptyHeaders();
super.close(status, trailers);
}
}
}
25 changes: 25 additions & 0 deletions servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
Expand All @@ -46,6 +47,7 @@
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -72,6 +74,8 @@ public final class ServletServerBuilder extends ForwardingServerBuilder<ServletS
private boolean internalCaller;
private boolean usingCustomScheduler;
private InternalServerImpl internalServer;
private final List<ServerInterceptor> interceptors = new ArrayList<>();
private boolean forceTrailers;

public ServletServerBuilder() {
serverImplBuilder = new ServerImplBuilder(this::buildTransportServers);
Expand Down Expand Up @@ -102,6 +106,10 @@ public ServletAdapter buildServletAdapter() {
}

private ServerTransportListener buildAndStart() {
interceptors.forEach(serverImplBuilder::intercept);
if (forceTrailers) {
serverImplBuilder.intercept(new ForceTrailersServerInterceptor());
}
Server server;
try {
internalCaller = true;
Expand Down Expand Up @@ -176,6 +184,23 @@ public ServletServerBuilder maxInboundMessageSize(int bytes) {
return this;
}

/**
* Controls trailer behavior to make servlet containers respect the gRPC http2 semantic.
* @param forceTrailers when true, adds {@link ForceTrailersServerInterceptor}
* at the end of the interceptor list
* @return this
*/
public ServletServerBuilder forceTrailers(boolean forceTrailers) {
this.forceTrailers = forceTrailers;
return this;
}

@Override
public ServletServerBuilder intercept(ServerInterceptor interceptor) {
interceptors.add(checkNotNull(interceptor, "interceptor"));
return this;
}

/**
* Provides a custom scheduled executor service to the server builder.
*
Expand Down
62 changes: 39 additions & 23 deletions servlet/src/tomcatTest/java/io/grpc/servlet/TomcatInteropTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package io.grpc.servlet;

import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.testing.integration.AbstractInteropTest;
import java.io.File;
import io.grpc.testing.integration.Messages;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.startup.Tomcat;
Expand All @@ -32,6 +37,8 @@
import org.junit.Ignore;
import org.junit.Test;

import java.io.File;

/**
* Interop test for Tomcat server and Netty client.
*/
Expand All @@ -41,6 +48,7 @@ public class TomcatInteropTest extends AbstractInteropTest {
private static final String MYAPP = "/grpc.testing.TestService";
private int port;
private Tomcat server;
private MutableServerInterceptorHolder mutableServerInterceptorHolder;

@After
@Override
Expand All @@ -60,7 +68,11 @@ public static void cleanUp() throws Exception {

@Override
protected ServerBuilder<?> getServerBuilder() {
return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
mutableServerInterceptorHolder = new MutableServerInterceptorHolder();
return new ServletServerBuilder()
.forceTrailers(true)
.intercept(mutableServerInterceptorHolder)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
}

@Override
Expand Down Expand Up @@ -113,27 +125,31 @@ protected boolean metricsExpected() {
@Test
public void gracefulShutdown() {}

// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void specialStatusMessage() {}

// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void unimplementedMethod() {}

// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void statusCodeAndMessage() {}
public void forceTrailerTest() {
mutableServerInterceptorHolder.delegate = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
@Override
public void onReady() {
throw new RuntimeException("onReady");
}
};
}
};

blockingStub.unaryCall(Messages.SimpleRequest.getDefaultInstance());
}

// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void emptyStream() {}
private static class MutableServerInterceptorHolder implements ServerInterceptor {
volatile ServerInterceptor delegate;
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (delegate != null) {
return delegate.interceptCall(call, headers, next);
}
return next.startCall(call, headers);
}
}
}