Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture HTTP/2 server active stream metrics #2357

Merged
merged 1 commit into from Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/asciidoc/http-server.adoc
Expand Up @@ -560,6 +560,8 @@ The following table provides information for the HTTP server metrics:
[width="100%",options="header"]
|=======
| metric name | type | description
| reactor.netty.http.server.streams.active | Gauge | The number of active HTTP/2 streams.
See <<observability-metrics-streams-active>>
| reactor.netty.http.server.connections.active | Gauge | The number of http connections currently processing requests.
See <<observability-metrics-connections-active>>
| reactor.netty.http.server.connections.total | Gauge | The number of all opened connections.
Expand Down
5 changes: 5 additions & 0 deletions reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Expand Up @@ -256,6 +256,11 @@ public class Metrics {
*/
public static final String PENDING_TASKS = ".pending.tasks";

// HttpServer Metrics
/**
* The number of active HTTP/2 streams
*/
public static final String STREAMS_ACTIVE = ".streams.active";

// Tags
public static final String LOCAL_ADDRESS = "local.address";
Expand Down
4 changes: 4 additions & 0 deletions reactor-netty-http/build.gradle
Expand Up @@ -197,6 +197,10 @@ task japicmp(type: JapicmpTask) {
ignoreMissingClasses = true
includeSynthetic = true
onlyIf { "$compatibleVersion" != "SKIP" }
methodExcludes = [
"reactor.netty.http.server.HttpServerMetricsRecorder#recordStreamClosed(java.net.SocketAddress)",
"reactor.netty.http.server.HttpServerMetricsRecorder#recordStreamOpened(java.net.SocketAddress)"
]
}

tasks.japicmp.dependsOn(downloadBaseline)
Expand Down
Expand Up @@ -45,12 +45,10 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler {

long dataSent;


long dataReceivedTime;

long dataSentTime;


final Function<String, String> uriTagValue;

protected AbstractHttpServerMetricsHandler(@Nullable Function<String, String> uriTagValue) {
Expand Down Expand Up @@ -130,12 +128,16 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
if (!ops.isHttp2() && ops.hostAddress() != null) {
// This metric is not applicable for HTTP/2
// ops.hostAddress() == null when request decoding failed, in this case
// we do not report active connection, so we do not report inactive connection
// ops.hostAddress() == null when request decoding failed, in this case
// we do not report active connection, so we do not report inactive connection
if (ops.hostAddress() != null) {
try {
recordInactiveConnection(ops);
if (ops.isHttp2()) {
recordClosedStream(ops);
}
else {
recordInactiveConnection(ops);
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
Expand Down Expand Up @@ -165,8 +167,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
if (!ops.isHttp2()) {
// This metric is not applicable for HTTP/2
if (ops.isHttp2()) {
recordOpenStream(ops);
}
else {
recordActiveConnection(ops);
}
startRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), ops.method().name());
Expand Down Expand Up @@ -258,6 +262,14 @@ protected void recordInactiveConnection(HttpServerOperations ops) {
recorder().recordServerConnectionInactive(ops.hostAddress());
}

protected void recordOpenStream(HttpServerOperations ops) {
recorder().recordStreamOpened(ops.hostAddress());
}

protected void recordClosedStream(HttpServerOperations ops) {
recorder().recordStreamClosed(ops.hostAddress());
}

protected void startRead(HttpServerOperations ops, String path, String method) {
dataReceivedTime = System.nanoTime();
}
Expand Down
Expand Up @@ -47,6 +47,26 @@ public Meter.Type getType() {
}
},

/**
* The number of HTTP/2 streams currently active on the server
*/
STREAMS_ACTIVE {
@Override
public String getName() {
return "reactor.netty.http.server.streams.active";
}

@Override
public KeyName[] getKeyNames() {
return StreamsActiveTags.values();
}

@Override
public Meter.Type getType() {
return Meter.Type.GAUGE;
}
},

/**
* Amount of the data received, in bytes.
*/
Expand Down Expand Up @@ -157,6 +177,29 @@ public Meter.Type getType() {
}
};

enum StreamsActiveTags implements KeyName {

/**
* Local address.
*/
LOCAL_ADDRESS {
@Override
public String getKeyName() {
return "local.address";
}
},

/**
* URI.
*/
URI {
@Override
public String getKeyName() {
return "uri";
}
}
}

enum ConnectionsActiveTags implements KeyName {

/**
Expand Down
Expand Up @@ -71,4 +71,21 @@ default void recordServerConnectionActive(SocketAddress localAddress) { }
* @since 1.0.15
*/
default void recordServerConnectionInactive(SocketAddress localAddress) { }

/**
* Record an opened HTTP/2 stream
*
* @param localAddress the local server address
* @since 1.0.21
*/
default void recordStreamOpened(SocketAddress localAddress) { }

/**
* Record a closed HTTP/2 stream
*
* @param localAddress the local server address
* @since 1.0.21
*/
default void recordStreamClosed(SocketAddress localAddress) { }

}
Expand Up @@ -42,6 +42,7 @@
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.URI;
import static reactor.netty.http.server.HttpServerMeters.CONNECTIONS_ACTIVE;
import static reactor.netty.http.server.HttpServerMeters.STREAMS_ACTIVE;

/**
* @author Violeta Georgieva
Expand All @@ -52,7 +53,9 @@ final class MicrometerHttpServerMetricsRecorder extends MicrometerHttpMetricsRec
final static MicrometerHttpServerMetricsRecorder INSTANCE = new MicrometerHttpServerMetricsRecorder();
private final static String PROTOCOL_VALUE_HTTP = "http";
private final LongAdder activeConnectionsAdder = new LongAdder();
private final LongAdder activeStreamsAdder = new LongAdder();
private final ConcurrentMap<String, LongAdder> activeConnectionsCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LongAdder> activeStreamsCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DistributionSummary> dataReceivedCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DistributionSummary> dataSentCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Counter> errorsCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -156,6 +159,22 @@ public void recordServerConnectionInactive(SocketAddress localAddress) {
}
}

@Override
public void recordStreamOpened(SocketAddress localAddress) {
LongAdder adder = getActiveStreamsAdder(localAddress);
if (adder != null) {
adder.increment();
}
}

@Override
public void recordStreamClosed(SocketAddress localAddress) {
LongAdder adder = getActiveStreamsAdder(localAddress);
if (adder != null) {
adder.decrement();
}
}

@Override
public void recordDataReceived(SocketAddress remoteAddress, long bytes) {
// noop
Expand Down Expand Up @@ -186,6 +205,21 @@ public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time,
throw new UnsupportedOperationException();
}

@Nullable
private LongAdder getActiveStreamsAdder(SocketAddress localAddress) {
String address = reactor.netty.Metrics.formatSocketAddress(localAddress);
return MapUtils.computeIfAbsent(activeStreamsCache, address,
key -> {
Gauge gauge = filter(
Gauge.builder(STREAMS_ACTIVE.getName(), activeStreamsAdder, LongAdder::longValue)
.tags(HttpServerMeters.StreamsActiveTags.URI.getKeyName(), PROTOCOL_VALUE_HTTP,
HttpServerMeters.StreamsActiveTags.LOCAL_ADDRESS.getKeyName(), address)
.register(REGISTRY));
return gauge != null ? activeStreamsAdder : null;
});
}

@Nullable
private LongAdder getServerConnectionAdder(SocketAddress localAddress) {
String address = reactor.netty.Metrics.formatSocketAddress(localAddress);
return MapUtils.computeIfAbsent(activeConnectionsCache, address,
Expand Down
Expand Up @@ -85,6 +85,7 @@
import static reactor.netty.Metrics.REMOTE_ADDRESS;
import static reactor.netty.Metrics.RESPONSE_TIME;
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.STREAMS_ACTIVE;
import static reactor.netty.Metrics.TLS_HANDSHAKE_TIME;
import static reactor.netty.Metrics.URI;
import static reactor.netty.Metrics.formatSocketAddress;
Expand Down Expand Up @@ -602,6 +603,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco
}
else {
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
checkGauge(SERVER_STREAMS_ACTIVE, true, 0, URI, HTTP, LOCAL_ADDRESS, address);
}

// These metrics are meant only for the servers,
Expand Down Expand Up @@ -736,7 +738,10 @@ private void checkServerConnectionsMicrometer(HttpServerRequest request) {
String address = formatSocketAddress(request.hostAddress());
boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text());
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
if (!isHttp2) {
if (isHttp2) {
checkGauge(SERVER_STREAMS_ACTIVE, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
}
else {
checkGauge(SERVER_CONNECTIONS_ACTIVE, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
}
}
Expand Down Expand Up @@ -940,6 +945,7 @@ static Stream<Arguments> httpCompatibleProtocols() {

private static final String SERVER_CONNECTIONS_ACTIVE = HTTP_SERVER_PREFIX + CONNECTIONS_ACTIVE;
private static final String SERVER_CONNECTIONS_TOTAL = HTTP_SERVER_PREFIX + CONNECTIONS_TOTAL;
private static final String SERVER_STREAMS_ACTIVE = HTTP_SERVER_PREFIX + STREAMS_ACTIVE;
private static final String SERVER_RESPONSE_TIME = HTTP_SERVER_PREFIX + RESPONSE_TIME;
private static final String SERVER_DATA_SENT_TIME = HTTP_SERVER_PREFIX + DATA_SENT_TIME;
private static final String SERVER_DATA_RECEIVED_TIME = HTTP_SERVER_PREFIX + DATA_RECEIVED_TIME;
Expand Down