Skip to content

Commit

Permalink
DRAFT: Add support for grpc-web
Browse files Browse the repository at this point in the history
This is a basic implementation of the grpc-web protocol over HTTP/2.

Do NOT use this as-is in production; it does not properly configure
HTTP access controls through CORS.

Unfortunately, some of the changes here are a bit at odds with the
current class structure:
- The Deframer is wrapped, but the code expects it *not* to be wrapped
  in some places
- The Framer uses Sink which we overwrite with custom implementations
  of framing (to implement Base64 encoding) and writeTrailers

It also also unclear to me under what circumstances headers may not be
written yet when trailers are sent.

Also note that this *DOES NOT* add support for grpc-web over HTTP/1.1.

Related to grpc#4823.
  • Loading branch information
ulfjack committed Mar 14, 2021
1 parent c3caafa commit 18dc3d4
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
super(maxMessageSize, statsTraceCtx, transportTracer, Protocol.GRPC);
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
}

Expand Down
8 changes: 5 additions & 3 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public StatsTraceContext statsTraceContext() {
}

/**
* This should only called from the transport thread (except for private interactions with
* This should only be called from the transport thread (except for private interactions with
* {@code AbstractServerStream}).
*/
protected abstract static class TransportState extends AbstractStream.TransportState {
Expand All @@ -191,11 +191,13 @@ protected abstract static class TransportState extends AbstractStream.TransportS
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
TransportTracer transportTracer,
Protocol protocol) {
super(
maxMessageSize,
statsTraceCtx,
Preconditions.checkNotNull(transportTracer, "transportTracer"));
Preconditions.checkNotNull(transportTracer, "transportTracer"),
protocol);
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/io/grpc/internal/AbstractStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public abstract static class TransportState
protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) {
TransportTracer transportTracer,
Protocol protocol) {
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
rawDeframer = new MessageDeframer(
Expand All @@ -158,7 +159,7 @@ protected TransportState(
statsTraceCtx,
transportTracer);
// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
deframer = rawDeframer;
deframer = protocol == Protocol.GRPC_WEB_TEXT ? new Base64Deframer(rawDeframer) : rawDeframer;
}

final void optimizeForDirectExecutor() {
Expand All @@ -168,6 +169,7 @@ final void optimizeForDirectExecutor() {

protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
rawDeframer.setFullStreamDecompressor(fullStreamDecompressor);
// TODO: Do we need to rewrap with Base64Deframer?
deframer = new ApplicationThreadDeframer(this, this, rawDeframer);
}

Expand Down
66 changes: 66 additions & 0 deletions core/src/main/java/io/grpc/internal/Base64Deframer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import io.grpc.Decompressor;
import java.util.Base64;

public class Base64Deframer implements Deframer {
private final Deframer delegate;

public Base64Deframer(Deframer delegate) {
this.delegate = delegate;
}

@Override
public void setMaxInboundMessageSize(int messageSize) {
delegate.setMaxInboundMessageSize(messageSize);
}

@Override
public void setDecompressor(Decompressor decompressor) {
delegate.setDecompressor(decompressor);
}

@Override
public void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
delegate.setFullStreamDecompressor(fullStreamDecompressor);
}

@Override
public void request(int numMessages) {
delegate.request(numMessages);
}

@Override
public void deframe(ReadableBuffer data) {
// TODO: Make this more efficient.
byte[] decoded = Base64.getDecoder().decode(ReadableBuffers.readArray(data));
data.close();
delegate.deframe(ReadableBuffers.wrap(decoded));
}

@Override
public void closeWhenComplete() {
delegate.closeWhenComplete();
}

@Override
public void close() {
delegate.close();
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/io/grpc/internal/GrpcUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public byte[] parseAsciiString(byte[] serialized) {
*/
public static final String CONTENT_TYPE_GRPC = "application/grpc";

public static final String CONTENT_TYPE_GRPC_WEB = "application/grpc-web";
public static final String CONTENT_TYPE_GRPC_WEB_TEXT = "application/grpc-web-text";
public static final String CONTENT_TYPE_GRPC_WEB_TEXT_PROTO = "application/grpc-web-text+proto";

/**
* The HTTP method used for GRPC requests.
*/
Expand Down Expand Up @@ -432,6 +436,14 @@ public static boolean isGrpcContentType(String contentType) {
return nextChar == '+' || nextChar == ';';
}

public static boolean isGrpcWebContentType(String contentType) {
if (contentType == null) {
return false;
}
// TODO: Also support grpc-web, grpc-web+proto, and grpc-web-text+proto.
return CONTENT_TYPE_GRPC_WEB_TEXT.equals(contentType.toLowerCase());
}

/**
* Gets the User-Agent string for the gRPC transport.
*/
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/io/grpc/internal/Protocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

public enum Protocol {
GRPC,
GRPC_WEB_TEXT;
}
22 changes: 17 additions & 5 deletions netty/src/main/java/io/grpc/netty/GrpcHttp2OutboundHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.netty;

import io.grpc.internal.Protocol;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString;
import java.util.Iterator;
Expand Down Expand Up @@ -46,11 +47,22 @@ static GrpcHttp2OutboundHeaders clientRequestHeaders(byte[][] serializedMetadata
return new GrpcHttp2OutboundHeaders(preHeaders, serializedMetadata);
}

static GrpcHttp2OutboundHeaders serverResponseHeaders(byte[][] serializedMetadata) {
AsciiString[] preHeaders = new AsciiString[] {
Http2Headers.PseudoHeaderName.STATUS.value(), Utils.STATUS_OK,
Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC,
};
static GrpcHttp2OutboundHeaders serverResponseHeaders(byte[][] serializedMetadata,
Protocol protocol) {
AsciiString[] preHeaders;
if (protocol == Protocol.GRPC) {
preHeaders = new AsciiString[] {
Http2Headers.PseudoHeaderName.STATUS.value(), Utils.STATUS_OK,
Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC,
};
} else {
// TODO: Add CORS headers!
preHeaders =
new AsciiString[] {
Http2Headers.PseudoHeaderName.STATUS.value(), Utils.STATUS_OK,
Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC_WEB_TEXT_PROTO,
};
}
return new GrpcHttp2OutboundHeaders(preHeaders, serializedMetadata);
}

Expand Down
15 changes: 11 additions & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.LogExceptionRunnable;
import io.grpc.internal.Protocol;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
Expand Down Expand Up @@ -417,19 +418,23 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
return;
}
String contentTypeString = contentType.toString();
if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
boolean isGrpcWeb = GrpcUtil.isGrpcWebContentType(contentTypeString);
if (!isGrpcWeb && !GrpcUtil.isGrpcContentType(contentTypeString)) {
respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
String.format("Content-Type '%s' is not supported", contentTypeString));
return;
}
Protocol protocol = isGrpcWeb ? Protocol.GRPC_WEB_TEXT : Protocol.GRPC;

if (!HTTP_METHOD.contentEquals(headers.method())) {
respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
String.format("Method '%s' is not supported", headers.method()));
return;
}

if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
if ((protocol == Protocol.GRPC)
&& !teWarningLogged
&& !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
+ "some intermediate proxy may not support trailers",
TE_TRAILERS, headers.get(TE_HEADER)));
Expand All @@ -451,7 +456,8 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
maxMessageSize,
statsTraceCtx,
transportTracer,
method);
method,
protocol);

PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag());
try {
Expand All @@ -462,7 +468,8 @@ private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
attributes,
authority,
statsTraceCtx,
transportTracer);
transportTracer,
protocol);
transportListener.streamCreated(stream, method, metadata);
state.onStreamAllocated();
http2Stream.setProperty(streamKey, state);
Expand Down

0 comments on commit 18dc3d4

Please sign in to comment.