Skip to content

Commit

Permalink
brotli support in HttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
sullis committed Apr 16, 2024
1 parent 3f89d0d commit 0b893c4
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(
MonoSink<Connection> sink,
Context currentContext) {
boolean acceptGzip = false;
boolean acceptBrotli = false;
ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null;
SocketAddress proxyAddress = ((ClientTransportConfig<?>) config).proxyProvider() != null ?
((ClientTransportConfig<?>) config).proxyProvider().getAddress().get() : null;
Function<String, String> uriTagValue = null;
if (config instanceof HttpClientConfig) {
acceptGzip = ((HttpClientConfig) config).acceptGzip;
acceptBrotli = ((HttpClientConfig) config).acceptBrotli;
uriTagValue = ((HttpClientConfig) config).uriTagValue;
}
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue);
acceptGzip, acceptBrotli, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue);
}

@Override
Expand Down Expand Up @@ -238,6 +240,7 @@ static final class DisposableAcquire
final ConnectionObserver obs;
final ChannelOperations.OnSetup opsFactory;
final boolean acceptGzip;
final boolean acceptBrotli;
final ChannelMetricsRecorder metricsRecorder;
final long pendingAcquireTimeout;
final InstrumentedPool<Connection> pool;
Expand All @@ -254,6 +257,7 @@ static final class DisposableAcquire
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
Expand All @@ -267,6 +271,7 @@ static final class DisposableAcquire
this.obs = obs;
this.opsFactory = opsFactory;
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.metricsRecorder = metricsRecorder;
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.pool = pool;
Expand All @@ -283,6 +288,7 @@ static final class DisposableAcquire
this.obs = parent.obs;
this.opsFactory = parent.opsFactory;
this.acceptGzip = parent.acceptGzip;
this.acceptBrotli = parent.acceptBrotli;
this.metricsRecorder = parent.metricsRecorder;
this.pendingAcquireTimeout = parent.pendingAcquireTimeout;
this.pool = parent.pool;
Expand Down Expand Up @@ -412,7 +418,7 @@ public void operationComplete(Future<Http2StreamChannel> future) {
setChannelContext(ch, currentContext());
}
HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())),
opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);
opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);

ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
Expand Down Expand Up @@ -515,7 +516,13 @@ public final HttpClient baseUrl(String baseUrl) {
* @return a new {@link HttpClient}
*/
public final HttpClient compress(boolean compressionEnabled) {
configuration().headers.remove(HttpHeaderNames.ACCEPT_ENCODING);
if (compressionEnabled) {
configuration().acceptBrotli = Brotli.isAvailable();
if (configuration().acceptBrotli) {
configuration().headers.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR);
}

if (!configuration().acceptGzip) {
HttpClient dup = duplicate();
HttpHeaders headers = configuration().headers.copy();
Expand All @@ -525,14 +532,15 @@ public final HttpClient compress(boolean compressionEnabled) {
return dup;
}
}
else if (configuration().acceptGzip) {
else if (configuration().acceptGzip || configuration().acceptBrotli) {
HttpClient dup = duplicate();
if (isCompressing(configuration().headers)) {
HttpHeaders headers = configuration().headers.copy();
headers.remove(HttpHeaderNames.ACCEPT_ENCODING);
dup.configuration().headers = headers;
}
dup.configuration().acceptGzip = false;
dup.configuration().acceptBrotli = false;
return dup;
}
return this;
Expand Down Expand Up @@ -1602,7 +1610,8 @@ public final HttpClient wiretap(boolean enable) {
}

static boolean isCompressing(HttpHeaders h) {
return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true);
return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true)
|| h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR, true);
}

static String reactorNettyVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public String baseUrl() {
public int channelHash() {
int result = super.channelHash();
result = 31 * result + Boolean.hashCode(acceptGzip);
result = 31 * result + Boolean.hashCode(acceptBrotli);
result = 31 * result + Objects.hashCode(decoder);
result = 31 * result + _protocols;
result = 31 * result + Objects.hashCode(sslProvider);
Expand Down Expand Up @@ -202,6 +203,15 @@ public boolean isAcceptGzip() {
return acceptGzip;
}

/**
* Return whether Brotli compression is enabled.
*
* @return whether Brotli compression is enabled
*/
public boolean isAcceptBrotli() {
return acceptBrotli;
}

/**
* Return true if {@code retry once} is disabled, false otherwise.
*
Expand Down Expand Up @@ -314,6 +324,7 @@ public WebsocketClientSpec websocketClientSpec() {
// Protected/Package private write API

boolean acceptGzip;
boolean acceptBrotli;
String baseUrl;
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> body;
Function<? super Mono<? extends Connection>, ? extends Mono<? extends Connection>> connector;
Expand Down Expand Up @@ -349,6 +360,7 @@ public WebsocketClientSpec websocketClientSpec() {
Supplier<? extends SocketAddress> remoteAddress) {
super(connectionProvider, options, remoteAddress);
this.acceptGzip = false;
this.acceptBrotli = false;
this.cookieDecoder = ClientCookieDecoder.STRICT;
this.cookieEncoder = ClientCookieEncoder.STRICT;
this.decoder = new HttpResponseDecoderSpec();
Expand All @@ -363,6 +375,7 @@ public WebsocketClientSpec websocketClientSpec() {
HttpClientConfig(HttpClientConfig parent) {
super(parent);
this.acceptGzip = parent.acceptGzip;
this.acceptBrotli = parent.acceptBrotli;
this.baseUrl = parent.baseUrl;
this.body = parent.body;
this.connector = parent.connector;
Expand Down Expand Up @@ -537,6 +550,7 @@ static void addStreamHandlers(
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
Expand All @@ -551,7 +565,7 @@ static void addStreamHandlers(
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
.addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER);

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand Down Expand Up @@ -617,7 +631,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
}
}

static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder,
static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, boolean acceptBrotli, HttpResponseDecoderSpec decoder,
Http2Settings http2Settings, ConnectionObserver observer) {
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forClient()
Expand All @@ -639,6 +653,7 @@ static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpRe
static void configureHttp11OrH2CleartextPipeline(
ChannelPipeline p,
boolean acceptGzip,
boolean acceptBrotli,
HttpResponseDecoderSpec decoder,
Http2Settings http2Settings,
@Nullable ChannelMetricsRecorder metricsRecorder,
Expand Down Expand Up @@ -670,7 +685,7 @@ static void configureHttp11OrH2CleartextPipeline(
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();

Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));

HttpClientUpgradeHandler upgradeHandler =
new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
Expand All @@ -679,7 +694,7 @@ static void configureHttp11OrH2CleartextPipeline(
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand All @@ -704,6 +719,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
@SuppressWarnings("deprecation")
static void configureHttp11Pipeline(ChannelPipeline p,
boolean acceptGzip,
boolean acceptBrotli,
HttpResponseDecoderSpec decoder,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
Expand All @@ -720,7 +736,7 @@ static void configureHttp11Pipeline(ChannelPipeline p,
NettyPipeline.HttpCodec,
new HttpClientCodec(decoderConfig, decoder.failOnMissingResponse, decoder.parseHttpAfterConnectRequest));

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand Down Expand Up @@ -779,6 +795,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
static final class H2CleartextCodec extends ChannelHandlerAdapter {

final boolean acceptGzip;
final boolean acceptBrotli;
final Http2FrameCodec http2FrameCodec;
final ChannelMetricsRecorder metricsRecorder;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -790,11 +807,13 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {
Http2FrameCodec http2FrameCodec,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
@Nullable Function<String, String> uriTagValue) {
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.http2FrameCodec = http2FrameCodec;
this.metricsRecorder = metricsRecorder;
this.opsFactory = opsFactory;
Expand All @@ -819,12 +838,12 @@ public void handlerAdded(ChannelHandlerContext ctx) {
if (responseTimeoutHandler != null) {
pipeline.remove(NettyPipeline.ResponseTimeoutHandler);
http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE,
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress,
new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress,
remoteAddress, responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue));
}
else {
http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE,
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));
new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, uriTagValue));
}
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec)
.addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler);
Expand All @@ -839,6 +858,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
static final class H2Codec extends ChannelInitializer<Channel> {

final boolean acceptGzip;
final boolean acceptBrotli;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -853,26 +873,29 @@ static final class H2Codec extends ChannelInitializer<Channel> {
@Nullable ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this(owner, observer, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);
this(owner, observer, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue);
}

H2Codec(
@Nullable Http2ConnectionProvider.DisposableAcquire owner,
@Nullable ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable SocketAddress proxyAddress,
SocketAddress remoteAddress,
long responseTimeoutMillis,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.metricsRecorder = metricsRecorder;
this.observer = observer;
this.opsFactory = opsFactory;
Expand All @@ -891,7 +914,7 @@ protected void initChannel(Channel ch) {
setChannelContext(ch, owner.currentContext());
}
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory,
acceptGzip, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue);
acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue);
}
else {
// Handle server pushes (inbound streams)
Expand All @@ -915,6 +938,7 @@ public boolean isSharable() {

static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
final boolean acceptGzip;
final boolean acceptBrotli;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final ChannelMetricsRecorder metricsRecorder;
Expand All @@ -925,6 +949,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {

H2OrHttp11Codec(HttpClientChannelInitializer initializer, ConnectionObserver observer, SocketAddress remoteAddress) {
this.acceptGzip = initializer.acceptGzip;
this.acceptBrotli = initializer.acceptBrotli;
this.decoder = initializer.decoder;
this.http2Settings = initializer.http2Settings;
this.metricsRecorder = initializer.metricsRecorder;
Expand All @@ -945,10 +970,10 @@ public void channelActive(ChannelHandlerContext ctx) {
log.debug(format(ctx.channel(), "Negotiated application-level protocol [" + protocol + "]"));
}
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, http2Settings, observer);
configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer);
}
else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
}
else {
throw new IllegalStateException("unknown protocol: " + protocol);
Expand All @@ -967,6 +992,7 @@ else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
static final class HttpClientChannelInitializer implements ChannelPipelineConfigurer {

final boolean acceptGzip;
final boolean acceptBrotli;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final ChannelMetricsRecorder metricsRecorder;
Expand All @@ -978,6 +1004,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig

HttpClientChannelInitializer(HttpClientConfig config) {
this.acceptGzip = config.acceptGzip;
this.acceptBrotli = config.acceptBrotli;
this.decoder = config.decoder;
this.http2Settings = config.http2Settings();
this.metricsRecorder = config.metricsRecorderInternal();
Expand All @@ -999,21 +1026,21 @@ public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullabl
new H2OrHttp11Codec(this, observer, remoteAddress));
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
}
else if ((protocols & h2) == h2) {
configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer);
configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer);
}
}
else {
if ((protocols & h11orH2C) == h11orH2C) {
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue);
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue);
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue);
}
else if ((protocols & h2c) == h2c) {
configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer);
configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer);
}
}
}
Expand Down

0 comments on commit 0b893c4

Please sign in to comment.