Skip to content

Commit

Permalink
Add accesslog for HTTP/3 server support
Browse files Browse the repository at this point in the history
Related to #1531
  • Loading branch information
violetagg committed Apr 23, 2024
1 parent d7c3296 commit 3933f90
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 6 deletions.
Expand Up @@ -30,20 +30,26 @@
import reactor.netty.NettyPipeline;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.logging.HttpMessageLogFactory;
import reactor.netty.http.server.logging.AccessLog;
import reactor.netty.http.server.logging.AccessLogArgProvider;
import reactor.netty.http.server.logging.AccessLogHandlerFactory;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;

import static reactor.netty.ReactorNetty.format;

final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {

static final Logger log = Loggers.getLogger(Http3Codec.class);

final boolean accessLogEnabled;
final Function<AccessLogArgProvider, AccessLog> accessLog;
final BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate;
final ServerCookieDecoder cookieDecoder;
final ServerCookieEncoder cookieEncoder;
Expand All @@ -59,6 +65,8 @@ final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {
final boolean validate;

Http3Codec(
boolean accessLogEnabled,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder decoder,
ServerCookieEncoder encoder,
Expand All @@ -71,6 +79,8 @@ final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {
@Nullable Duration readTimeout,
@Nullable Duration requestTimeout,
boolean validate) {
this.accessLogEnabled = accessLogEnabled;
this.accessLog = accessLog;
this.compressPredicate = compressPredicate;
this.cookieDecoder = decoder;
this.cookieEncoder = encoder;
Expand All @@ -88,6 +98,11 @@ final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {
@Override
protected void initChannel(QuicStreamChannel channel) {
ChannelPipeline p = channel.pipeline();

if (accessLogEnabled) {
p.addLast(NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H3.create(accessLog));
}

p.addLast(new Http3FrameToHttpObjectCodec(true, validate))
.addLast(NettyPipeline.HttpTrafficHandler,
new Http3StreamBridgeServerHandler(compressPredicate, cookieDecoder, cookieEncoder, formDecoderProvider,
Expand All @@ -103,6 +118,8 @@ protected void initChannel(QuicStreamChannel channel) {
}

static ChannelHandler newHttp3ServerConnectionHandler(
boolean accessLogEnabled,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder decoder,
ServerCookieEncoder encoder,
Expand All @@ -116,7 +133,7 @@ static ChannelHandler newHttp3ServerConnectionHandler(
@Nullable Duration requestTimeout,
boolean validate) {
return new Http3ServerConnectionHandler(
new Http3Codec(compressPredicate, decoder, encoder, formDecoderProvider, forwardedHeaderHandler,
new Http3Codec(accessLogEnabled, accessLog, compressPredicate, decoder, encoder, formDecoderProvider, forwardedHeaderHandler,
httpMessageLogFactory, listener, mapHandle, opsFactory, readTimeout, requestTimeout, validate));
}
}
Expand Up @@ -590,6 +590,8 @@ static BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate(

static void configureHttp3Pipeline(
ChannelPipeline p,
boolean accessLogEnabled,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
ServerCookieDecoder cookieDecoder,
ServerCookieEncoder cookieEncoder,
Expand All @@ -604,9 +606,9 @@ static void configureHttp3Pipeline(
boolean validate) {
p.remove(NettyPipeline.ReactiveBridge);

p.addLast(NettyPipeline.HttpCodec, newHttp3ServerConnectionHandler(compressPredicate, cookieDecoder, cookieEncoder,
formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, listener, mapHandle, opsFactory,
readTimeout, requestTimeout, validate));
p.addLast(NettyPipeline.HttpCodec, newHttp3ServerConnectionHandler(accessLogEnabled, accessLog, compressPredicate,
cookieDecoder, cookieEncoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory,
listener, mapHandle, opsFactory, readTimeout, requestTimeout, validate));
}

static void configureH2Pipeline(ChannelPipeline p,
Expand Down Expand Up @@ -1395,6 +1397,8 @@ else if ((protocols & h2) == h2) {
else if ((protocols & h3) == h3) {
configureHttp3Pipeline(
channel.pipeline(),
accessLogEnabled,
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
cookieDecoder,
cookieEncoder,
Expand Down
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.http.server.logging;

import io.netty.incubator.codec.http3.Http3HeadersFrame;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.util.Objects;

final class AccessLogArgProviderH3 extends AbstractAccessLogArgProvider<AccessLogArgProviderH3> {

static final String H3_PROTOCOL_NAME = "HTTP/3.0";

Http3HeadersFrame requestHeaders;
Http3HeadersFrame responseHeaders;

AccessLogArgProviderH3(@Nullable SocketAddress remoteAddress) {
super(remoteAddress);
}

AccessLogArgProviderH3 requestHeaders(Http3HeadersFrame requestHeaders) {
this.requestHeaders = Objects.requireNonNull(requestHeaders, "requestHeaders");
onRequest();
return get();
}

AccessLogArgProviderH3 responseHeaders(Http3HeadersFrame responseHeaders) {
this.responseHeaders = Objects.requireNonNull(responseHeaders, "responseHeaders");
return get();
}

@Override
@Nullable
public CharSequence status() {
return responseHeaders == null ? null : responseHeaders.headers().status();
}

@Override
@Nullable
public CharSequence requestHeader(CharSequence name) {
Objects.requireNonNull(name, "name");
return requestHeaders == null ? null : requestHeaders.headers().get(name);
}

@Override
@Nullable
public CharSequence responseHeader(CharSequence name) {
Objects.requireNonNull(name, "name");
return responseHeaders == null ? null : responseHeaders.headers().get(name);
}

@Override
void onRequest() {
super.onRequest();
if (requestHeaders != null) {
super.method = requestHeaders.headers().method();
super.uri = requestHeaders.headers().path();
super.protocol = H3_PROTOCOL_NAME;
}
}

@Override
void clear() {
super.clear();
this.requestHeaders = null;
this.responseHeaders = null;
}

@Override
public AccessLogArgProviderH3 get() {
return this;
}

}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,11 @@ public enum AccessLogHandlerFactory {
/**
* HTTP/2.0.
*/
H2;
H2,
/**
* HTTP/3.0.
*/
H3;

/**
* Create an access log handler, {@link AccessLogHandlerH1} or {@link AccessLogHandlerH2}.
Expand All @@ -44,6 +48,8 @@ public enum AccessLogHandlerFactory {
*/
public ChannelHandler create(@Nullable Function<AccessLogArgProvider, AccessLog> accessLog) {
switch (this) {
case H3:
return new AccessLogHandlerH3(accessLog);
case H2:
return new AccessLogHandlerH2(accessLog);
case H1:
Expand Down
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.http.server.logging;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.quic.QuicChannel;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.server.HttpServerInfos;
import reactor.util.annotation.Nullable;

import java.util.function.Function;

final class AccessLogHandlerH3 extends BaseAccessLogHandler {

AccessLogArgProviderH3 accessLogArgProvider;

AccessLogHandlerH3(@Nullable Function<AccessLogArgProvider, AccessLog> accessLog) {
super(accessLog);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http3HeadersFrame) {
Http3HeadersFrame requestHeaders = (Http3HeadersFrame) msg;

if (accessLogArgProvider == null) {
accessLogArgProvider = new AccessLogArgProviderH3(ctx.channel().parent() instanceof QuicChannel ?
((QuicChannel) ctx.channel().parent()).remoteSocketAddress() : null);
}
accessLogArgProvider.requestHeaders(requestHeaders);

ctx.channel()
.closeFuture()
.addListener(f -> {
AccessLog log = accessLog.apply(accessLogArgProvider);
if (log != null) {
log.log();
}
accessLogArgProvider.clear();
});
}
ctx.fireChannelRead(msg);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Http3HeadersFrame) {
Http3HeadersFrame responseHeaders = (Http3HeadersFrame) msg;

if (HttpResponseStatus.CONTINUE.codeAsText().contentEquals(responseHeaders.headers().status())) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
return;
}

accessLogArgProvider.responseHeaders(responseHeaders)
.chunked(true);

ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops instanceof HttpServerInfos) {
super.applyServerInfos(accessLogArgProvider, (HttpServerInfos) ops);
}
}
if (msg instanceof Http3DataFrame) {
Http3DataFrame data = (Http3DataFrame) msg;

accessLogArgProvider.increaseContentLength(data.content().readableBytes());
}

//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}
}
Expand Up @@ -237,6 +237,13 @@
"name": "reactor.netty.http.server.logging.AccessLogHandlerH2",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty.http.server.logging.AccessLogHandlerH3"
},
"name": "reactor.netty.http.server.logging.AccessLogHandlerH3",
"queryAllPublicMethods": true
},
{
"condition": {
"typeReachable": "reactor.netty.http.server.logging.BaseAccessLogHandler"
Expand Down

0 comments on commit 3933f90

Please sign in to comment.