Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deregister connection pool metrics from Micrometer (or alternate registry) when disposing the connection pool #2608

Merged
merged 4 commits into from Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -817,5 +817,17 @@ interface MeterRegistrar {

void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics);

/**
* Invoked when a connection pool is disposed.
*
* @param poolName the pool name
* @param id the pool id
* @param remoteAddress the remote address
* @since 1.0.26
*/
default void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) {

}

}
}
Expand Up @@ -18,6 +18,7 @@
import java.net.SocketAddress;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import reactor.netty.Metrics;
import reactor.pool.InstrumentedPool;
Expand Down Expand Up @@ -76,4 +77,16 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
.tags(tags)
.register(REGISTRY);
}

void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) {
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), addressAsString, NAME.asString(), poolName);

REGISTRY.remove(new Meter.Id(TOTAL_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(ACTIVE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(PENDING_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(MAX_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(MAX_PENDING_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
}
}
Expand Up @@ -190,15 +190,34 @@ public final Mono<Void> disposeLater() {
.stream()
.map(e -> {
Pool<T> pool = e.getValue();
if (pool instanceof GracefulShutdownInstrumentedPool) {
SocketAddress remoteAddress = e.getKey().holder;
String id = e.getKey().hashCode() + "";
PoolFactory<T> poolFactory = poolFactory(remoteAddress);
if (pool instanceof GracefulShutdownInstrumentedPool) {
return ((GracefulShutdownInstrumentedPool<T>) pool)
.disposeGracefully(disposeTimeout)
.onErrorResume(t -> {
log.error("Connection pool for [{}] didn't shut down gracefully", e.getKey(), t);
return Mono.empty();
return Mono.fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
}
});
});
}
return pool.disposeLater();
return pool.disposeLater().then(
Mono.<Void>fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
}
})
);
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
Expand All @@ -223,7 +242,18 @@ public final void disposeWhen(SocketAddress address) {
if (log.isDebugEnabled()) {
log.debug("ConnectionProvider[name={}]: Disposing pool for [{}]", name, e.getKey().fqdn);
}
e.getValue().dispose();
String id = e.getKey().hashCode() + "";
PoolFactory<T> poolFactory = poolFactory(address);
e.getValue().disposeLater().then(
Mono.<Void>fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, address);
}
else if (Metrics.isMicrometerAvailable()) {
deRegisterDefaultMetrics(id, address);
}
})
).subscribe();
}
});
}
Expand Down Expand Up @@ -281,6 +311,10 @@ protected void registerDefaultMetrics(String id, SocketAddress remoteAddress, In
MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE.registerMetrics(name, id, remoteAddress, metrics);
}

protected void deRegisterDefaultMetrics(String id, SocketAddress remoteAddress) {
MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE.deRegisterMetrics(name, id, remoteAddress);
}

final boolean compareAddresses(SocketAddress origin, SocketAddress target) {
if (origin.equals(target)) {
return true;
Expand Down
Expand Up @@ -127,4 +127,4 @@ public int permitMaximum() {
public void returnPermits(int returned) {
}
}
}
}
manolama marked this conversation as resolved.
Show resolved Hide resolved
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,7 @@
import reactor.netty.ConnectionObserver;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.transport.ClientTransportConfig;
import reactor.util.annotation.Nullable;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
Expand Down Expand Up @@ -69,33 +70,44 @@ void tearDown() throws Exception {

@Test
void customRegistrarIsUsed() {
AtomicBoolean used = new AtomicBoolean();
AtomicBoolean registered = new AtomicBoolean();
AtomicBoolean deRegistered = new AtomicBoolean();

triggerAcquisition(true, () -> (a, b, c, d) -> used.set(true));
assertThat(used.get()).isTrue();
triggerAcquisition(true, () -> new MeterRegistrarImpl(registered, deRegistered, null));
assertThat(registered.get()).isTrue();
assertThat(deRegistered.get()).isFalse();

pool.dispose();
assertThat(deRegistered.get()).isTrue();
}

@Test
void connectionPoolMaxMetrics() {
AtomicInteger maxAllocSize = new AtomicInteger();
AtomicInteger maxPendingAcquireSize = new AtomicInteger();
triggerAcquisition(true, () -> (a, b, c, d) -> {
maxAllocSize.set(d.maxAllocatedSize());
maxPendingAcquireSize.set(d.maxPendingAcquireSize());
});
assertThat(maxAllocSize.get()).isEqualTo(MAX_ALLOC_SIZE);
assertThat(maxPendingAcquireSize.get()).isEqualTo(MAX_PENDING_ACQUIRE_SIZE);
AtomicInteger customMetric = new AtomicInteger();

triggerAcquisition(true, () -> new MeterRegistrarImpl(null, null, customMetric));
assertThat(customMetric.get()).isEqualTo(MAX_ALLOC_SIZE);
}

@Test
void customRegistrarSupplierNotInvokedWhenMetricsDisabled() {
AtomicBoolean used = new AtomicBoolean();
AtomicBoolean registered = new AtomicBoolean();

triggerAcquisition(false, () -> {
used.set(true);
return null;
});
assertThat(used.get()).isFalse();
triggerAcquisition(false, () -> new MeterRegistrarImpl(registered, null, null));
assertThat(registered.get()).isFalse();
}

@Test
void disposeWhenMetricsDeregistered() {
AtomicBoolean registered = new AtomicBoolean();
AtomicBoolean deRegistered = new AtomicBoolean();

triggerAcquisition(true, () -> new MeterRegistrarImpl(registered, deRegistered, null));
assertThat(registered.get()).isTrue();
assertThat(deRegistered.get()).isFalse();

pool.disposeWhen(remoteAddress.get());
assertThat(deRegistered.get()).isTrue();
}

private void triggerAcquisition(boolean metricsEnabled, Supplier<ConnectionProvider.MeterRegistrar> registrarSupplier) {
Expand All @@ -118,6 +130,37 @@ private void triggerAcquisition(boolean metricsEnabled, Supplier<ConnectionProvi
}
}

static final class MeterRegistrarImpl implements ConnectionProvider.MeterRegistrar {
AtomicBoolean registered;
AtomicBoolean deRegistered;
AtomicInteger customMetric;

MeterRegistrarImpl(
@Nullable AtomicBoolean registered,
@Nullable AtomicBoolean deRegistered,
@Nullable AtomicInteger customMetric) {
this.registered = registered;
this.deRegistered = deRegistered;
this.customMetric = customMetric;
}
@Override
public void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics) {
if (registered != null) {
registered.set(true);
}
if (customMetric != null) {
customMetric.set(metrics.maxAllocatedSize());
}
}

@Override
public void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) {
if (deRegistered != null) {
deRegistered.set(true);
}
}
}

static final class ClientTransportConfigImpl extends ClientTransportConfig<ClientTransportConfigImpl> {

final EventLoopGroup group;
Expand Down
Expand Up @@ -131,6 +131,12 @@ protected void registerDefaultMetrics(String id, SocketAddress remoteAddress, In
.registerMetrics(name(), id, remoteAddress, metrics);
}

@Override
protected void deRegisterDefaultMetrics(String id, SocketAddress remoteAddress) {
MicrometerHttp2ConnectionProviderMeterRegistrar.INSTANCE
.deRegisterMetrics(name(), id, remoteAddress);
}

static void invalidate(@Nullable ConnectionObserver owner) {
if (owner instanceof DisposableAcquire) {
DisposableAcquire da = (DisposableAcquire) owner;
Expand Down
Expand Up @@ -16,6 +16,7 @@
package reactor.netty.http.client;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import reactor.netty.Metrics;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
Expand Down Expand Up @@ -59,4 +60,13 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
.tags(tags)
.register(REGISTRY);
}

void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) {
manolama marked this conversation as resolved.
Show resolved Hide resolved
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), addressAsString, NAME.asString(), poolName);
REGISTRY.remove(new Meter.Id(ACTIVE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(ACTIVE_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE));
REGISTRY.remove(new Meter.Id(PENDING_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE));
}
}
Expand Up @@ -293,7 +293,8 @@ void testConnectionPoolPendingAcquireSize() throws Exception {
provider.disposeLater()
.block(Duration.ofSeconds(30));
}
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2.testConnectionPoolPendingAcquireSize")).isEqualTo(0);
// deRegistered
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2.testConnectionPoolPendingAcquireSize")).isEqualTo(-1);
}

private double getGaugeValue(String gaugeName, String poolName) {
Expand Down