From 59d0d230f69ac098fff5d1a281bc3d1993b532cc Mon Sep 17 00:00:00 2001 From: samueldlightfoot Date: Tue, 5 Jul 2022 22:32:32 +0100 Subject: [PATCH] Capture HTTP/2 server active stream metrics --- docs/asciidoc/http-server.adoc | 2 + .../AbstractHttpServerMetricsHandler.java | 30 +++++++++---- .../netty/http/server/HttpServerMeters.java | 43 +++++++++++++++++++ .../server/HttpServerMetricsRecorder.java | 17 ++++++++ .../MicrometerHttpServerMetricsRecorder.java | 34 +++++++++++++++ .../netty/http/HttpMetricsHandlerTests.java | 8 +++- 6 files changed, 124 insertions(+), 10 deletions(-) diff --git a/docs/asciidoc/http-server.adoc b/docs/asciidoc/http-server.adoc index 71fa381d8c..4a1f3d524d 100644 --- a/docs/asciidoc/http-server.adoc +++ b/docs/asciidoc/http-server.adoc @@ -560,6 +560,8 @@ The following table provides information for the HTTP server metrics: [width="100%",options="header"] |======= | metric name | type | description +| reactor.netty.http.server.active.streams | Gauge | The number of active HTTP/2 streams. +See <> | reactor.netty.http.server.connections.active | Gauge | The number of http connections currently processing requests. See <> | reactor.netty.http.server.connections.total | Gauge | The number of all opened connections. diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 7f07157d40..f70cd40be8 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -45,12 +45,10 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler { long dataSent; - long dataReceivedTime; long dataSentTime; - final Function uriTagValue; protected AbstractHttpServerMetricsHandler(@Nullable Function uriTagValue) { @@ -130,12 +128,16 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) log.warn("Exception caught while recording metrics.", e); // Allow request-response exchange to continue, unaffected by metrics problem } - if (!ops.isHttp2() && ops.hostAddress() != null) { - // This metric is not applicable for HTTP/2 - // ops.hostAddress() == null when request decoding failed, in this case - // we do not report active connection, so we do not report inactive connection + // ops.hostAddress() == null when request decoding failed, in this case + // we do not report active connection, so we do not report inactive connection + if (ops.hostAddress() != null) { try { - recordInactiveConnection(ops); + if (ops.isHttp2()) { + recordClosedStream(ops); + } + else { + recordInactiveConnection(ops); + } } catch (RuntimeException e) { log.warn("Exception caught while recording metrics.", e); @@ -165,8 +167,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations) { HttpServerOperations ops = (HttpServerOperations) channelOps; - if (!ops.isHttp2()) { - // This metric is not applicable for HTTP/2 + if (ops.isHttp2()) { + recordOpenStream(ops); + } + else { recordActiveConnection(ops); } startRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), ops.method().name()); @@ -258,6 +262,14 @@ protected void recordInactiveConnection(HttpServerOperations ops) { recorder().recordServerConnectionInactive(ops.hostAddress()); } + protected void recordOpenStream(HttpServerOperations ops) { + recorder().recordStreamOpened(ops.hostAddress()); + } + + protected void recordClosedStream(HttpServerOperations ops) { + recorder().recordStreamClosed(ops.hostAddress()); + } + protected void startRead(HttpServerOperations ops, String path, String method) { dataReceivedTime = System.nanoTime(); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMeters.java index c03e37ee92..977668a0db 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMeters.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMeters.java @@ -47,6 +47,26 @@ public Meter.Type getType() { } }, + /** + * The number of HTTP/2 streams currently active on the server + */ + ACTIVE_STREAMS { + @Override + public String getName() { + return "reactor.netty.http.server.active.streams"; + } + + @Override + public KeyName[] getKeyNames() { + return ActiveStreamsTags.values(); + } + + @Override + public Meter.Type getType() { + return Meter.Type.GAUGE; + } + }, + /** * Amount of the data received, in bytes. */ @@ -157,6 +177,29 @@ public Meter.Type getType() { } }; + enum ActiveStreamsTags implements KeyName { + + /** + * Local address. + */ + LOCAL_ADDRESS { + @Override + public String getKeyName() { + return "local.address"; + } + }, + + /** + * URI. + */ + URI { + @Override + public String getKeyName() { + return "uri"; + } + } + } + enum ConnectionsActiveTags implements KeyName { /** diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMetricsRecorder.java index 944ab9d9d6..165c4fd34b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerMetricsRecorder.java @@ -71,4 +71,21 @@ default void recordServerConnectionActive(SocketAddress localAddress) { } * @since 1.0.15 */ default void recordServerConnectionInactive(SocketAddress localAddress) { } + + /** + * Record an opened HTTP/2 stream + * + * @param localAddress the local server address + * @since 1.0.21 + */ + default void recordStreamOpened(SocketAddress localAddress) { } + + /** + * Record a closed HTTP/2 stream + * + * @param localAddress the local server address + * @since 1.0.21 + */ + default void recordStreamClosed(SocketAddress localAddress) { } + } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java index 66cb43d331..a0d0715849 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/MicrometerHttpServerMetricsRecorder.java @@ -42,6 +42,7 @@ import static reactor.netty.Metrics.STATUS; import static reactor.netty.Metrics.URI; import static reactor.netty.http.server.HttpServerMeters.CONNECTIONS_ACTIVE; +import static reactor.netty.http.server.HttpServerMeters.ACTIVE_STREAMS; /** * @author Violeta Georgieva @@ -52,7 +53,9 @@ final class MicrometerHttpServerMetricsRecorder extends MicrometerHttpMetricsRec final static MicrometerHttpServerMetricsRecorder INSTANCE = new MicrometerHttpServerMetricsRecorder(); private final static String PROTOCOL_VALUE_HTTP = "http"; private final LongAdder activeConnectionsAdder = new LongAdder(); + private final LongAdder activeStreamsAdder = new LongAdder(); private final ConcurrentMap activeConnectionsCache = new ConcurrentHashMap<>(); + private final ConcurrentMap activeStreamsCache = new ConcurrentHashMap<>(); private final ConcurrentMap dataReceivedCache = new ConcurrentHashMap<>(); private final ConcurrentMap dataSentCache = new ConcurrentHashMap<>(); private final ConcurrentMap errorsCache = new ConcurrentHashMap<>(); @@ -156,6 +159,22 @@ public void recordServerConnectionInactive(SocketAddress localAddress) { } } + @Override + public void recordStreamOpened(SocketAddress localAddress) { + LongAdder adder = getActiveStreamsAdder(localAddress); + if (adder != null) { + adder.increment(); + } + } + + @Override + public void recordStreamClosed(SocketAddress localAddress) { + LongAdder adder = getActiveStreamsAdder(localAddress); + if (adder != null) { + adder.decrement(); + } + } + @Override public void recordDataReceived(SocketAddress remoteAddress, long bytes) { // noop @@ -186,6 +205,21 @@ public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time, throw new UnsupportedOperationException(); } + @Nullable + private LongAdder getActiveStreamsAdder(SocketAddress localAddress) { + String address = reactor.netty.Metrics.formatSocketAddress(localAddress); + return MapUtils.computeIfAbsent(activeStreamsCache, address, + key -> { + Gauge gauge = filter( + Gauge.builder(ACTIVE_STREAMS.getName(), activeStreamsAdder, LongAdder::longValue) + .tags(HttpServerMeters.ActiveStreamsTags.URI.getKeyName(), PROTOCOL_VALUE_HTTP, + HttpServerMeters.ActiveStreamsTags.LOCAL_ADDRESS.getKeyName(), address) + .register(REGISTRY)); + return gauge != null ? activeStreamsAdder : null; + }); + } + + @Nullable private LongAdder getServerConnectionAdder(SocketAddress localAddress) { String address = reactor.netty.Metrics.formatSocketAddress(localAddress); return MapUtils.computeIfAbsent(activeConnectionsCache, address, diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index faab4b2c90..39f4d05f5b 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -70,6 +70,7 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static reactor.netty.Metrics.ACTIVE_STREAMS; import static reactor.netty.Metrics.CONNECTIONS_ACTIVE; import static reactor.netty.Metrics.CONNECTIONS_TOTAL; import static reactor.netty.Metrics.CONNECT_TIME; @@ -602,6 +603,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco } else { checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address); + checkGauge(SERVER_STREAMS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address); } // These metrics are meant only for the servers, @@ -736,7 +738,10 @@ private void checkServerConnectionsMicrometer(HttpServerRequest request) { String address = formatSocketAddress(request.hostAddress()); boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address); - if (!isHttp2) { + if (isHttp2) { + checkGauge(SERVER_STREAMS_ACTIVE, true, 1, URI, HTTP, LOCAL_ADDRESS, address); + } + else { checkGauge(SERVER_CONNECTIONS_ACTIVE, true, 1, URI, HTTP, LOCAL_ADDRESS, address); } } @@ -940,6 +945,7 @@ static Stream httpCompatibleProtocols() { private static final String SERVER_CONNECTIONS_ACTIVE = HTTP_SERVER_PREFIX + CONNECTIONS_ACTIVE; private static final String SERVER_CONNECTIONS_TOTAL = HTTP_SERVER_PREFIX + CONNECTIONS_TOTAL; + private static final String SERVER_STREAMS_ACTIVE = HTTP_SERVER_PREFIX + ACTIVE_STREAMS; private static final String SERVER_RESPONSE_TIME = HTTP_SERVER_PREFIX + RESPONSE_TIME; private static final String SERVER_DATA_SENT_TIME = HTTP_SERVER_PREFIX + DATA_SENT_TIME; private static final String SERVER_DATA_RECEIVED_TIME = HTTP_SERVER_PREFIX + DATA_RECEIVED_TIME;