Skip to content

Commit

Permalink
Add metrics for HTTP/3 server support
Browse files Browse the repository at this point in the history
Related to #1531
  • Loading branch information
violetagg committed Apr 23, 2024
1 parent ccd71b5 commit 9cb663b
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 30 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -121,7 +122,12 @@ public void channelInactive(ChannelHandlerContext ctx) {
}
}

recordInactiveConnectionOrStream(ctx.channel());
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
HttpServerOperations ops = null;
if (channelOps instanceof HttpServerOperations) {
ops = (HttpServerOperations) channelOps;
}
recordInactiveConnectionOrStream(ctx.channel(), ops);

ctx.fireChannelInactive();
}
Expand Down Expand Up @@ -162,9 +168,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
}
}
}

recordInactiveConnectionOrStream(ctx.channel());
recordInactiveConnectionOrStream(ctx.channel(), ops);
}

dataSent = 0;
});
Expand All @@ -190,16 +196,20 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
startRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), methodTagValue.apply(ops.method().name()));
}

channelActivated = true;
if (ctx.channel() instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordOpenStream(ctx.channel().localAddress());
}
else {
// Always use the real connection local address without any proxy information
recordActiveConnection(ctx.channel().localAddress());
channelActivated = true;
if (ctx.channel() instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordOpenStream(ops.connectionHostAddress());
}
else if (ctx.channel() instanceof SocketChannel) {
// Always use the real connection local address without any proxy information
recordActiveConnection(ops.connectionHostAddress());
}
else {
// Always use the real connection local address without any proxy information
recordOpenStream(ops.connectionHostAddress());
}
}
}

Expand Down Expand Up @@ -311,17 +321,22 @@ protected void startWrite(HttpServerOperations ops, String path, String method,
dataSentTime = System.nanoTime();
}

void recordInactiveConnectionOrStream(Channel channel) {
void recordInactiveConnectionOrStream(Channel channel, @Nullable HttpServerOperations ops) {
if (channelActivated) {
channelActivated = false;
try {
SocketAddress localAddress = ops != null ? ops.connectionHostAddress() : channel.localAddress();
if (channel instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordClosedStream(channel.localAddress());
recordClosedStream(localAddress);
}
else if (channel instanceof SocketChannel) {
// Always use the real connection local address without any proxy information
recordInactiveConnection(localAddress);
}
else {
// Always use the real connection local address without any proxy information
recordInactiveConnection(channel.localAddress());
recordClosedStream(localAddress);
}
}
catch (RuntimeException e) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@
import java.net.SocketAddress;
import java.util.function.BiFunction;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -57,7 +56,7 @@ public final class ConnectionInfo {

final boolean isInetAddress;

static ConnectionInfo from(Channel channel, HttpRequest request, boolean secured, SocketAddress remoteAddress,
static ConnectionInfo from(HttpRequest request, boolean secured, SocketAddress localAddress, SocketAddress remoteAddress,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler) {
String hostName = DEFAULT_HOST_NAME;
int hostPort = -1;
Expand All @@ -76,11 +75,11 @@ static ConnectionInfo from(Channel channel, HttpRequest request, boolean secured
}

if (!(remoteAddress instanceof InetSocketAddress)) {
return new ConnectionInfo(channel.localAddress(), hostName, hostPort, remoteAddress, scheme, false);
return new ConnectionInfo(localAddress, hostName, hostPort, remoteAddress, scheme, false);
}
else {
ConnectionInfo connectionInfo =
new ConnectionInfo(channel.localAddress(), hostName, hostPort, remoteAddress, scheme, true);
new ConnectionInfo(localAddress, hostName, hostPort, remoteAddress, scheme, true);
if (forwardedHeaderHandler != null) {
return forwardedHeaderHandler.apply(connectionInfo, request);
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,9 +128,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
ConnectionInfo connectionInfo = null;
try {
pendingResponse = true;
connectionInfo = ConnectionInfo.from(ctx.channel(),
connectionInfo = ConnectionInfo.from(
request,
secured,
ctx.channel().localAddress(),
remoteAddress,
forwardedHeaderHandler);
ops = new HttpServerOperations(Connection.from(ctx.channel()),
Expand Down
Expand Up @@ -28,6 +28,7 @@
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.logging.HttpMessageLogFactory;
import reactor.netty.http.server.logging.AccessLog;
Expand Down Expand Up @@ -59,10 +60,13 @@ final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {
final ConnectionObserver listener;
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>>
mapHandle;
final Function<String, String> methodTagValue;
final ChannelMetricsRecorder metricsRecorder;
final int minCompressionSize;
final ChannelOperations.OnSetup opsFactory;
final Duration readTimeout;
final Duration requestTimeout;
final Function<String, String> uriTagValue;
final boolean validate;

Http3Codec(
Expand All @@ -76,10 +80,13 @@ final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {
HttpMessageLogFactory httpMessageLogFactory,
ConnectionObserver listener,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
@Nullable Function<String, String> methodTagValue,
@Nullable ChannelMetricsRecorder metricsRecorder,
int minCompressionSize,
ChannelOperations.OnSetup opsFactory,
@Nullable Duration readTimeout,
@Nullable Duration requestTimeout,
@Nullable Function<String, String> uriTagValue,
boolean validate) {
this.accessLogEnabled = accessLogEnabled;
this.accessLog = accessLog;
Expand All @@ -91,10 +98,13 @@ final class Http3Codec extends ChannelInitializer<QuicStreamChannel> {
this.httpMessageLogFactory = httpMessageLogFactory;
this.listener = listener;
this.mapHandle = mapHandle;
this.methodTagValue = methodTagValue;
this.metricsRecorder = metricsRecorder;
this.minCompressionSize = minCompressionSize;
this.opsFactory = opsFactory;
this.readTimeout = readTimeout;
this.requestTimeout = requestTimeout;
this.uriTagValue = uriTagValue;
this.validate = validate;
}

Expand All @@ -119,6 +129,22 @@ protected void initChannel(QuicStreamChannel channel) {

ChannelOperations.addReactiveBridge(channel, opsFactory, listener);

if (metricsRecorder != null) {
if (metricsRecorder instanceof HttpServerMetricsRecorder) {
ChannelHandler handler;
if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
handler = new MicrometerHttpServerMetricsHandler((MicrometerHttpServerMetricsRecorder) metricsRecorder, methodTagValue, uriTagValue);
}
else if (metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder) {
handler = new ContextAwareHttpServerMetricsHandler((ContextAwareHttpServerMetricsRecorder) metricsRecorder, methodTagValue, uriTagValue);
}
else {
handler = new HttpServerMetricsHandler((HttpServerMetricsRecorder) metricsRecorder, methodTagValue, uriTagValue);
}
p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler);
}
}

channel.pipeline().remove(this);

if (log.isDebugEnabled()) {
Expand All @@ -137,13 +163,17 @@ static ChannelHandler newHttp3ServerConnectionHandler(
HttpMessageLogFactory httpMessageLogFactory,
ConnectionObserver listener,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
@Nullable Function<String, String> methodTagValue,
@Nullable ChannelMetricsRecorder metricsRecorder,
int minCompressionSize,
ChannelOperations.OnSetup opsFactory,
@Nullable Duration readTimeout,
@Nullable Duration requestTimeout,
@Nullable Function<String, String> uriTagValue,
boolean validate) {
return new Http3ServerConnectionHandler(
new Http3Codec(accessLogEnabled, accessLog, compressPredicate, decoder, encoder, formDecoderProvider, forwardedHeaderHandler,
httpMessageLogFactory, listener, mapHandle, minCompressionSize, opsFactory, readTimeout, requestTimeout, validate));
httpMessageLogFactory, listener, mapHandle, methodTagValue, metricsRecorder, minCompressionSize,
opsFactory, readTimeout, requestTimeout, uriTagValue, validate));
}
}
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty.http.server;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.incubator.codec.quic.QuicChannel;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.logging.HttpMessageLogFactory;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;

final class Http3ServerOperations extends HttpServerOperations {

Http3ServerOperations(HttpServerOperations replaced) {
super(replaced);
}

Http3ServerOperations(
Connection c,
ConnectionObserver listener,
HttpRequest nettyRequest,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate,
ConnectionInfo connectionInfo,
ServerCookieDecoder decoder,
ServerCookieEncoder encoder,
HttpServerFormDecoderProvider formDecoderProvider,
HttpMessageLogFactory httpMessageLogFactory,
boolean isHttp2,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
@Nullable Duration readTimeout,
@Nullable Duration requestTimeout,
boolean secured,
ZonedDateTime timestamp) {
super(c, listener, nettyRequest, compressionPredicate, connectionInfo, decoder, encoder, formDecoderProvider,
httpMessageLogFactory, isHttp2, mapHandle, readTimeout, requestTimeout, secured, timestamp);
}

Http3ServerOperations(
Connection c,
ConnectionObserver listener,
HttpRequest nettyRequest,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressionPredicate,
ConnectionInfo connectionInfo,
ServerCookieDecoder decoder,
ServerCookieEncoder encoder,
HttpServerFormDecoderProvider formDecoderProvider,
HttpMessageLogFactory httpMessageLogFactory,
boolean isHttp2,
@Nullable BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle,
@Nullable Duration readTimeout,
@Nullable Duration requestTimeout,
boolean resolvePath,
boolean secured,
ZonedDateTime timestamp) {
super(c, listener, nettyRequest, compressionPredicate, connectionInfo, decoder, encoder, formDecoderProvider,
httpMessageLogFactory, isHttp2, mapHandle, readTimeout, requestTimeout, resolvePath, secured, timestamp);
}

@Override
public SocketAddress connectionHostAddress() {
return ((QuicChannel) channel().parent()).localSocketAddress();
}

@Override
public SocketAddress connectionRemoteAddress() {
return ((QuicChannel) channel().parent()).remoteSocketAddress();
}
}
Expand Up @@ -30,6 +30,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.util.ReferenceCountUtil;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -105,8 +106,10 @@ public void channelActive(ChannelHandlerContext ctx) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
assert ctx.channel().parent() instanceof QuicChannel;
QuicChannel parent = (QuicChannel) ctx.channel().parent();
if (remoteAddress == null) {
remoteAddress = ctx.channel().parent().remoteAddress();
remoteAddress = parent.remoteSocketAddress();
}
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
Expand All @@ -115,12 +118,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
ConnectionInfo connectionInfo = null;
try {
pendingResponse = true;
connectionInfo = ConnectionInfo.from(ctx.channel(),
connectionInfo = ConnectionInfo.from(
request,
true,
parent.localSocketAddress(),
remoteAddress,
forwardedHeaderHandler);
ops = new HttpServerOperations(Connection.from(ctx.channel()),
ops = new Http3ServerOperations(Connection.from(ctx.channel()),
listener,
request,
compress,
Expand Down

0 comments on commit 9cb663b

Please sign in to comment.