Skip to content

Commit

Permalink
Merge #3230 into 1.2.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed May 7, 2024
2 parents c37eec6 + 4fdac19 commit 4a14be4
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 209 deletions.
2 changes: 2 additions & 0 deletions reactor-netty-core/src/main/java/reactor/netty/Metrics.java
Expand Up @@ -293,6 +293,8 @@ public class Metrics {

public static final String UNKNOWN = "UNKNOWN";

public static final String NA = "na";

@Nullable
public static Observation currentObservation(ContextView contextView) {
if (contextView.hasKey(OBSERVATION_KEY)) {
Expand Down
Expand Up @@ -35,6 +35,7 @@

import static reactor.netty.Metrics.CONNECT_TIME;
import static reactor.netty.Metrics.ERROR;
import static reactor.netty.Metrics.NA;
import static reactor.netty.Metrics.OBSERVATION_REGISTRY;
import static reactor.netty.Metrics.SUCCESS;
import static reactor.netty.Metrics.TLS_HANDSHAKE_TIME;
Expand Down Expand Up @@ -110,12 +111,7 @@ public Observation.Context get() {

@Override
public Timer getTimer() {
if (proxyAddress == null) {
return recorder.getConnectTimer(getName(), netPeerName + ":" + netPeerPort, status);
}
else {
return recorder.getConnectTimer(getName(), netPeerName + ":" + netPeerPort, proxyAddress, status);
}
return recorder.getConnectTimer(getName(), netPeerName + ":" + netPeerPort, proxyAddress == null ? NA : proxyAddress, status);
}

@Override
Expand Down Expand Up @@ -205,13 +201,8 @@ public KeyValues getHighCardinalityKeyValues() {

@Override
public KeyValues getLowCardinalityKeyValues() {
if (proxyAddress == null) {
return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort, STATUS.asString(), status);
}
else {
return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort,
PROXY_ADDRESS.asString(), proxyAddress, STATUS.asString(), status);
}
return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ":" + netPeerPort,
PROXY_ADDRESS.asString(), proxyAddress == null ? NA : proxyAddress, STATUS.asString(), status);
}

@Override
Expand Down Expand Up @@ -349,12 +340,12 @@ public KeyValues getHighCardinalityKeyValues() {

@Override
public KeyValues getLowCardinalityKeyValues() {
if (proxyAddress == null) {
if (recorder.onServer) {
return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ':' + netPeerPort, STATUS.asString(), status);
}
else {
return KeyValues.of(REMOTE_ADDRESS.asString(), netPeerName + ':' + netPeerPort,
PROXY_ADDRESS.asString(), proxyAddress, STATUS.asString(), status);
PROXY_ADDRESS.asString(), proxyAddress == null ? NA : proxyAddress, STATUS.asString(), status);
}
}

Expand All @@ -375,12 +366,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {

@Override
public Timer getTimer() {
if (proxyAddress == null) {
return recorder.getTlsHandshakeTimer(getName(), netPeerName + ':' + netPeerPort, status);
}
else {
return recorder.getTlsHandshakeTimer(getName(), netPeerName + ':' + netPeerPort, proxyAddress, status);
}
return recorder.getTlsHandshakeTimer(getName(), netPeerName + ':' + netPeerPort, proxyAddress == null ? NA : proxyAddress, status);
}
}
}
Expand Up @@ -36,6 +36,7 @@
import static reactor.netty.Metrics.DATA_RECEIVED;
import static reactor.netty.Metrics.DATA_SENT;
import static reactor.netty.Metrics.ERRORS;
import static reactor.netty.Metrics.NA;
import static reactor.netty.Metrics.PROXY_ADDRESS;
import static reactor.netty.Metrics.REGISTRY;
import static reactor.netty.Metrics.REMOTE_ADDRESS;
Expand All @@ -50,13 +51,10 @@
* @since 0.9
*/
public class MicrometerChannelMetricsRecorder implements ChannelMetricsRecorder {
final ConcurrentMap<String, DistributionSummary> dataReceivedCacheNoProxy = new ConcurrentHashMap<>();
final ConcurrentMap<MeterKey, DistributionSummary> dataReceivedCache = new ConcurrentHashMap<>();

final ConcurrentMap<String, DistributionSummary> dataSentCacheNoProxy = new ConcurrentHashMap<>();
final ConcurrentMap<MeterKey, DistributionSummary> dataSentCache = new ConcurrentHashMap<>();

final ConcurrentMap<String, Counter> errorsCacheNoProxy = new ConcurrentHashMap<>();
final ConcurrentMap<MeterKey, Counter> errorsCache = new ConcurrentHashMap<>();

final ConcurrentMap<MeterKey, Timer> connectTimeCache = new ConcurrentHashMap<>();
Expand All @@ -69,113 +67,123 @@ public class MicrometerChannelMetricsRecorder implements ChannelMetricsRecorder

final String name;
final String protocol;
final boolean onServer;

public MicrometerChannelMetricsRecorder(String name, String protocol) {
this(name, protocol, true);
}

public MicrometerChannelMetricsRecorder(String name, String protocol, boolean onServer) {
this.name = name;
this.protocol = protocol;
this.onServer = onServer;
}

@Override
public void recordDataReceived(SocketAddress remoteAddress, long bytes) {
String address = formatSocketAddress(remoteAddress);
DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCacheNoProxy, address,
key -> filter(DistributionSummary.builder(name + DATA_RECEIVED)
.baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit())
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address)
.register(REGISTRY)));
if (ds != null) {
ds.record(bytes);
}
recordDataReceived(remoteAddress, NA, bytes);
}

@Override
public void recordDataReceived(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) {
recordDataReceived(remoteAddress, formatSocketAddress(proxyAddress), bytes);
}

void recordDataReceived(SocketAddress remoteAddress, @Nullable String proxyAddress, long bytes) {
String address = formatSocketAddress(remoteAddress);
String proxyAddr = formatSocketAddress(proxyAddress);
MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, null);
DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCache, meterKey,
key -> filter(DistributionSummary.builder(name + DATA_RECEIVED)
.baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit())
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address,
ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr)
.register(REGISTRY)));
MeterKey meterKey = new MeterKey(null, address, proxyAddress, null, null);
DistributionSummary ds = MapUtils.computeIfAbsent(dataReceivedCache, meterKey, key -> {
DistributionSummary.Builder builder =
DistributionSummary.builder(name + DATA_RECEIVED)
.baseUnit(ChannelMeters.DATA_RECEIVED.getBaseUnit())
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address);
if (!onServer) {
builder.tag(ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddress);
}
return filter(builder.register(REGISTRY));
});
if (ds != null) {
ds.record(bytes);
}
}

@Override
public void recordDataSent(SocketAddress remoteAddress, long bytes) {
String address = formatSocketAddress(remoteAddress);
DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCacheNoProxy, address,
key -> filter(DistributionSummary.builder(name + DATA_SENT)
.baseUnit(ChannelMeters.DATA_SENT.getBaseUnit())
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address)
.register(REGISTRY)));
if (ds != null) {
ds.record(bytes);
}
recordDataSent(remoteAddress, NA, bytes);
}

@Override
public void recordDataSent(SocketAddress remoteAddress, SocketAddress proxyAddress, long bytes) {
recordDataSent(remoteAddress, formatSocketAddress(proxyAddress), bytes);
}

void recordDataSent(SocketAddress remoteAddress, @Nullable String proxyAddress, long bytes) {
String address = formatSocketAddress(remoteAddress);
String proxyAddr = formatSocketAddress(proxyAddress);
MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, null);
DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCache, meterKey,
key -> filter(DistributionSummary.builder(name + DATA_SENT)
.baseUnit(ChannelMeters.DATA_SENT.getBaseUnit())
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address,
ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr)
.register(REGISTRY)));
MeterKey meterKey = new MeterKey(null, address, proxyAddress, null, null);
DistributionSummary ds = MapUtils.computeIfAbsent(dataSentCache, meterKey, key -> {
DistributionSummary.Builder builder =
DistributionSummary.builder(name + DATA_SENT)
.baseUnit(ChannelMeters.DATA_SENT.getBaseUnit())
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address);
if (!onServer) {
builder.tag(ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddress);
}
return filter(builder.register(REGISTRY));
});
if (ds != null) {
ds.record(bytes);
}
}

@Override
public void incrementErrorsCount(SocketAddress remoteAddress) {
String address = formatSocketAddress(remoteAddress);
Counter c = MapUtils.computeIfAbsent(errorsCacheNoProxy, address,
key -> filter(Counter.builder(name + ERRORS)
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address)
.register(REGISTRY)));
if (c != null) {
c.increment();
}
incrementErrorsCount(remoteAddress, NA);
}

@Override
public void incrementErrorsCount(SocketAddress remoteAddress, SocketAddress proxyAddress) {
incrementErrorsCount(remoteAddress, formatSocketAddress(proxyAddress));
}

void incrementErrorsCount(SocketAddress remoteAddress, @Nullable String proxyAddress) {
String address = formatSocketAddress(remoteAddress);
String proxyAddr = formatSocketAddress(proxyAddress);
MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, null);
Counter c = MapUtils.computeIfAbsent(errorsCache, meterKey,
key -> filter(Counter.builder(name + ERRORS)
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address,
ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddr)
.register(REGISTRY)));
MeterKey meterKey = new MeterKey(null, address, proxyAddress, null, null);
Counter c = MapUtils.computeIfAbsent(errorsCache, meterKey, key -> {
Counter.Builder builder = Counter.builder(name + ERRORS)
.tags(ChannelMeters.ChannelMetersTags.URI.asString(), protocol,
ChannelMeters.ChannelMetersTags.REMOTE_ADDRESS.asString(), address);
if (!onServer) {
builder.tag(ChannelMeters.ChannelMetersTags.PROXY_ADDRESS.asString(), proxyAddress);
}
return filter(builder.register(REGISTRY));
});
if (c != null) {
c.increment();
}
}

@Override
public void recordTlsHandshakeTime(SocketAddress remoteAddress, Duration time, String status) {
String address = formatSocketAddress(remoteAddress);
Timer timer = getTlsHandshakeTimer(name + TLS_HANDSHAKE_TIME, address, status);
Timer timer = getTlsHandshakeTimer(name + TLS_HANDSHAKE_TIME, formatSocketAddress(remoteAddress), NA, status);
if (timer != null) {
timer.record(time);
}
}

/**
* Returns TLS handshake timer.
*
* @param name the timer name
* @param address the remote address
* @param status the status of the TLS handshake operation
* @return TLS handshake timer
* @deprecated as of 1.1.19. Prefer the {@link #getTlsHandshakeTimer(String, String, String, String)}.
* This method will be removed in version 1.3.0.
*/
@Nullable
@Deprecated
public final Timer getTlsHandshakeTimer(String name, @Nullable String address, String status) {
MeterKey meterKey = new MeterKey(null, address, null, null, status);
return MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey,
Expand All @@ -186,57 +194,55 @@ public final Timer getTlsHandshakeTimer(String name, @Nullable String address, S

@Override
public void recordTlsHandshakeTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) {
String address = formatSocketAddress(remoteAddress);
String proxyAddr = formatSocketAddress(proxyAddress);
Timer timer = getTlsHandshakeTimer(name + TLS_HANDSHAKE_TIME, address, proxyAddr, status);
Timer timer = getTlsHandshakeTimer(name + TLS_HANDSHAKE_TIME, formatSocketAddress(remoteAddress), formatSocketAddress(proxyAddress), status);
if (timer != null) {
timer.record(time);
}
}

/**
* Returns TLS handshake timer.
*
* @param name the timer name
* @param remoteAddress the remote address
* @param proxyAddress the proxy address
* @param status the status of the TLS handshake operation
* @return TLS handshake timer
*/
@Nullable
public final Timer getTlsHandshakeTimer(String name, @Nullable String address, @Nullable String proxyAddr, String status) {
MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, status);
return MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey,
key -> filter(Timer.builder(name)
.tags(REMOTE_ADDRESS, address, PROXY_ADDRESS, proxyAddr, STATUS, status)
.register(REGISTRY)));
public final Timer getTlsHandshakeTimer(String name, @Nullable String remoteAddress, @Nullable String proxyAddress, String status) {
MeterKey meterKey = new MeterKey(null, remoteAddress, proxyAddress, null, status);
return MapUtils.computeIfAbsent(tlsHandshakeTimeCache, meterKey, key -> {
Timer.Builder builder = Timer.builder(name).tags(REMOTE_ADDRESS, remoteAddress, STATUS, status);
if (!onServer) {
builder.tag(PROXY_ADDRESS, proxyAddress);
}
return filter(builder.register(REGISTRY));
});
}

@Override
public void recordConnectTime(SocketAddress remoteAddress, Duration time, String status) {
String address = formatSocketAddress(remoteAddress);
Timer timer = getConnectTimer(name + CONNECT_TIME, address, status);
Timer timer = getConnectTimer(name + CONNECT_TIME, formatSocketAddress(remoteAddress), NA, status);
if (timer != null) {
timer.record(time);
}
}

@Nullable
final Timer getConnectTimer(String name, @Nullable String address, String status) {
MeterKey meterKey = new MeterKey(null, address, null, null, status);
return MapUtils.computeIfAbsent(connectTimeCache, meterKey,
key -> filter(Timer.builder(name)
.tags(REMOTE_ADDRESS, address, STATUS, status)
.register(REGISTRY)));
}

@Override
public void recordConnectTime(SocketAddress remoteAddress, SocketAddress proxyAddress, Duration time, String status) {
String address = formatSocketAddress(remoteAddress);
String proxyAddr = formatSocketAddress(proxyAddress);
Timer timer = getConnectTimer(name + CONNECT_TIME, address, proxyAddr, status);
Timer timer = getConnectTimer(name + CONNECT_TIME, formatSocketAddress(remoteAddress), formatSocketAddress(proxyAddress), status);
if (timer != null) {
timer.record(time);
}
}

@Nullable
final Timer getConnectTimer(String name, @Nullable String address, @Nullable String proxyAddr, String status) {
MeterKey meterKey = new MeterKey(null, address, proxyAddr, null, status);
final Timer getConnectTimer(String name, @Nullable String remoteAddress, @Nullable String proxyAddress, String status) {
MeterKey meterKey = new MeterKey(null, remoteAddress, proxyAddress, null, status);
return MapUtils.computeIfAbsent(connectTimeCache, meterKey,
key -> filter(Timer.builder(name)
.tags(REMOTE_ADDRESS, address, PROXY_ADDRESS, proxyAddr, STATUS, status)
.tags(REMOTE_ADDRESS, remoteAddress, PROXY_ADDRESS, proxyAddress, STATUS, status)
.register(REGISTRY)));
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -149,7 +149,7 @@ static final class MicrometerTcpClientMetricsRecorder extends MicrometerChannelM
static final MicrometerTcpClientMetricsRecorder INSTANCE = new MicrometerTcpClientMetricsRecorder();

MicrometerTcpClientMetricsRecorder() {
super(reactor.netty.Metrics.TCP_CLIENT_PREFIX, "tcp");
super(reactor.netty.Metrics.TCP_CLIENT_PREFIX, "tcp", false);
}
}

Expand Down

0 comments on commit 4a14be4

Please sign in to comment.