From 8a2cf359a50aeeef6e0b0182c41b493b2d0b17ee Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Thu, 8 Dec 2022 15:58:33 -0800 Subject: [PATCH 1/4] Deregister pool metrics from Micrometer (or alternate registry) in order to release resources when there is a large churn of connection pools and endpoints. --- .../netty/resources/ConnectionProvider.java | 2 + ...ooledConnectionProviderMeterRegistrar.java | 13 +++ .../resources/PooledConnectionProvider.java | 42 +++++++++- .../resources/ConnectionProviderTest.java | 15 +++- ...edConnectionProviderCustomMetricsTest.java | 82 ++++++++++++++----- ...Http2ConnectionProviderMeterRegistrar.java | 10 +++ 6 files changed, 139 insertions(+), 25 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index a7af81f2bc..428d9d0080 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -817,5 +817,7 @@ interface MeterRegistrar { void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics); + void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress); + } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPooledConnectionProviderMeterRegistrar.java b/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPooledConnectionProviderMeterRegistrar.java index 25b6a9c012..d16cf47b19 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPooledConnectionProviderMeterRegistrar.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/MicrometerPooledConnectionProviderMeterRegistrar.java @@ -18,6 +18,7 @@ import java.net.SocketAddress; import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Tags; import reactor.netty.Metrics; import reactor.pool.InstrumentedPool; @@ -76,4 +77,16 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In .tags(tags) .register(REGISTRY); } + + void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) { + String addressAsString = Metrics.formatSocketAddress(remoteAddress); + Tags tags = Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), addressAsString, NAME.asString(), poolName); + + REGISTRY.remove(new Meter.Id(TOTAL_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(ACTIVE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(PENDING_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(MAX_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(MAX_PENDING_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + } } diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 5e6996c3ce..b305df0bcc 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -190,15 +190,34 @@ public final Mono disposeLater() { .stream() .map(e -> { Pool pool = e.getValue(); - if (pool instanceof GracefulShutdownInstrumentedPool) { + SocketAddress remoteAddress = e.getKey().holder; + String id = e.getKey().hashCode() + ""; + PoolFactory poolFactory = poolFactory(remoteAddress); + if (pool instanceof GracefulShutdownInstrumentedPool) { return ((GracefulShutdownInstrumentedPool) pool) .disposeGracefully(disposeTimeout) .onErrorResume(t -> { log.error("Connection pool for [{}] didn't shut down gracefully", e.getKey(), t); - return Mono.empty(); + return Mono.fromRunnable(() -> { + if (poolFactory.registrar != null) { + poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress); + } + else if (Metrics.isMicrometerAvailable()) { + deRegisterDefaultMetrics(id, remoteAddress); + } + }); }); } - return pool.disposeLater(); + return pool.disposeLater().then( + Mono.fromRunnable(() -> { + if (poolFactory.registrar != null) { + poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress); + } + else if (Metrics.isMicrometerAvailable()) { + deRegisterDefaultMetrics(id, remoteAddress); + } + }) + ); }) .collect(Collectors.toList()); if (pools.isEmpty()) { @@ -223,7 +242,18 @@ public final void disposeWhen(SocketAddress address) { if (log.isDebugEnabled()) { log.debug("ConnectionProvider[name={}]: Disposing pool for [{}]", name, e.getKey().fqdn); } - e.getValue().dispose(); + String id = e.getKey().hashCode() + ""; + PoolFactory poolFactory = poolFactory(address); + e.getValue().disposeLater().then( + Mono.fromRunnable(() -> { + if (poolFactory.registrar != null) { + poolFactory.registrar.get().deRegisterMetrics(name, id, address); + } + else if (Metrics.isMicrometerAvailable()) { + deRegisterDefaultMetrics(id, address); + } + }) + ).subscribe(); } }); } @@ -281,6 +311,10 @@ protected void registerDefaultMetrics(String id, SocketAddress remoteAddress, In MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE.registerMetrics(name, id, remoteAddress, metrics); } + protected void deRegisterDefaultMetrics(String id, SocketAddress remoteAddress) { + MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE.deRegisterMetrics(name, id, remoteAddress); + } + final boolean compareAddresses(SocketAddress origin, SocketAddress target) { if (origin.equals(target)) { return true; diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index 18b62f4548..98518aa978 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -21,6 +21,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.net.SocketAddress; import java.time.Duration; import java.util.Collections; import java.util.Map; @@ -32,9 +33,21 @@ class ConnectionProviderTest { + static final ConnectionProvider.MeterRegistrar METER_REGISTRY = new ConnectionProvider.MeterRegistrar() { + @Override + public void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics) { + + } + + @Override + public void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) { + + } + }; + static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy(); static final String TEST_STRING = ""; - static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; + static final Supplier TEST_SUPPLIER = () -> METER_REGISTRY; static final BiFunction TEST_BI_FUNCTION = (r, duration) -> () -> {}; static final BiPredicate TEST_BI_PREDICATE = (conn, meta) -> true; diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java index f5516ddded..1c02e98394 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.transport.ClientTransportConfig; @@ -69,36 +70,47 @@ void tearDown() throws Exception { @Test void customRegistrarIsUsed() { - AtomicBoolean used = new AtomicBoolean(); + AtomicBoolean registered = new AtomicBoolean(); + AtomicBoolean deRegistered = new AtomicBoolean(); - triggerAcquisition(true, () -> (a, b, c, d) -> used.set(true)); - assertThat(used.get()).isTrue(); + triggerAcquisition(true, () -> new MeterRegistrarImpl(registered, deRegistered, null)); + assertThat(registered.get()).isTrue(); + assertThat(deRegistered.get()).isFalse(); + + pool.dispose(); + assertThat(deRegistered.get()).isTrue(); } @Test void connectionPoolMaxMetrics() { - AtomicInteger maxAllocSize = new AtomicInteger(); - AtomicInteger maxPendingAcquireSize = new AtomicInteger(); - triggerAcquisition(true, () -> (a, b, c, d) -> { - maxAllocSize.set(d.maxAllocatedSize()); - maxPendingAcquireSize.set(d.maxPendingAcquireSize()); - }); - assertThat(maxAllocSize.get()).isEqualTo(MAX_ALLOC_SIZE); - assertThat(maxPendingAcquireSize.get()).isEqualTo(MAX_PENDING_ACQUIRE_SIZE); + AtomicInteger customMetric = new AtomicInteger(); + + triggerAcquisition(true, () -> new MeterRegistrarImpl(null, null, customMetric)); + assertThat(customMetric.get()).isEqualTo(MAX_ALLOC_SIZE); } @Test void customRegistrarSupplierNotInvokedWhenMetricsDisabled() { - AtomicBoolean used = new AtomicBoolean(); + AtomicBoolean registered = new AtomicBoolean(); - triggerAcquisition(false, () -> { - used.set(true); - return null; - }); - assertThat(used.get()).isFalse(); + triggerAcquisition(false, () -> new MeterRegistrarImpl(registered, null, null)); + assertThat(registered.get()).isFalse(); + } + + @Test + void disposeWhenMetricsDeregistered() { + AtomicBoolean registered = new AtomicBoolean(); + AtomicBoolean deRegistered = new AtomicBoolean(); + + Connection conn = triggerAcquisition(true, () -> new MeterRegistrarImpl(registered, deRegistered, null)); + assertThat(registered.get()).isTrue(); + assertThat(deRegistered.get()).isFalse(); + + pool.disposeWhen(remoteAddress.get()); + assertThat(deRegistered.get()).isTrue(); } - private void triggerAcquisition(boolean metricsEnabled, Supplier registrarSupplier) { + private Connection triggerAcquisition(boolean metricsEnabled, Supplier registrarSupplier) { pool = ConnectionProvider.builder("test") .metrics(metricsEnabled, registrarSupplier) .maxConnections(MAX_ALLOC_SIZE) @@ -108,16 +120,46 @@ private void triggerAcquisition(boolean metricsEnabled, Supplier { final EventLoopGroup group; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java index b511356ced..b888c50d8d 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java @@ -16,6 +16,7 @@ package reactor.netty.http.client; import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Tags; import reactor.netty.Metrics; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; @@ -59,4 +60,13 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In .tags(tags) .register(REGISTRY); } + + void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) { + String addressAsString = Metrics.formatSocketAddress(remoteAddress); + Tags tags = Tags.of(ID.asString(), id, REMOTE_ADDRESS.asString(), addressAsString, NAME.asString(), poolName); + REGISTRY.remove(new Meter.Id(ACTIVE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(ACTIVE_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(IDLE_CONNECTIONS.getName(), tags, null, null, Meter.Type.GAUGE)); + REGISTRY.remove(new Meter.Id(PENDING_STREAMS.getName(), tags, null, null, Meter.Type.GAUGE)); + } } \ No newline at end of file From 9138d341932668dea1866da299f4a303cccef2d9 Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Fri, 9 Dec 2022 10:40:32 -0800 Subject: [PATCH 2/4] Address code review. Provide a default deRegister implementation to avoid interface changes for now. Remove unecessary Connection return from the unit metrics unit test. Thanks @violetagg --- .../netty/resources/ConnectionProvider.java | 12 +++++++++++- .../netty/resources/ConnectionProviderTest.java | 17 ++--------------- ...oledConnectionProviderCustomMetricsTest.java | 17 +++++++++-------- .../http/client/Http2ConnectionProvider.java | 6 ++++++ 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index 428d9d0080..e4c6e844f7 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -817,7 +817,17 @@ interface MeterRegistrar { void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics); - void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress); + /** + * Invoked when a connection pool is disposed. + * + * @param poolName the pool name + * @param id the pool id + * @param remoteAddress the remote address + * @since 1.0.26 + */ + default void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) { + + } } } diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index 98518aa978..017a9841a5 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -21,7 +21,6 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.net.SocketAddress; import java.time.Duration; import java.util.Collections; import java.util.Map; @@ -33,21 +32,9 @@ class ConnectionProviderTest { - static final ConnectionProvider.MeterRegistrar METER_REGISTRY = new ConnectionProvider.MeterRegistrar() { - @Override - public void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics) { - - } - - @Override - public void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) { - - } - }; - static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy(); static final String TEST_STRING = ""; - static final Supplier TEST_SUPPLIER = () -> METER_REGISTRY; + static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; static final BiFunction TEST_BI_FUNCTION = (r, duration) -> () -> {}; static final BiPredicate TEST_BI_PREDICATE = (conn, meta) -> true; @@ -140,4 +127,4 @@ public int permitMaximum() { public void returnPermits(int returned) { } } -} +} \ No newline at end of file diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java index 1c02e98394..7da25fe74f 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/PooledConnectionProviderCustomMetricsTest.java @@ -34,10 +34,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.transport.ClientTransportConfig; +import reactor.util.annotation.Nullable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -102,7 +102,7 @@ void disposeWhenMetricsDeregistered() { AtomicBoolean registered = new AtomicBoolean(); AtomicBoolean deRegistered = new AtomicBoolean(); - Connection conn = triggerAcquisition(true, () -> new MeterRegistrarImpl(registered, deRegistered, null)); + triggerAcquisition(true, () -> new MeterRegistrarImpl(registered, deRegistered, null)); assertThat(registered.get()).isTrue(); assertThat(deRegistered.get()).isFalse(); @@ -110,7 +110,7 @@ void disposeWhenMetricsDeregistered() { assertThat(deRegistered.get()).isTrue(); } - private Connection triggerAcquisition(boolean metricsEnabled, Supplier registrarSupplier) { + private void triggerAcquisition(boolean metricsEnabled, Supplier registrarSupplier) { pool = ConnectionProvider.builder("test") .metrics(metricsEnabled, registrarSupplier) .maxConnections(MAX_ALLOC_SIZE) @@ -120,16 +120,14 @@ private Connection triggerAcquisition(boolean metricsEnabled, Supplier { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 90028c227d..5d83544e2c 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -131,6 +131,12 @@ protected void registerDefaultMetrics(String id, SocketAddress remoteAddress, In .registerMetrics(name(), id, remoteAddress, metrics); } + @Override + protected void deRegisterDefaultMetrics(String id, SocketAddress remoteAddress) { + MicrometerHttp2ConnectionProviderMeterRegistrar.INSTANCE + .deRegisterMetrics(name(), id, remoteAddress); + } + static void invalidate(@Nullable ConnectionObserver owner) { if (owner instanceof DisposableAcquire) { DisposableAcquire da = (DisposableAcquire) owner; From e58a81ca9c85384fde01dfa3dc890bba89b1f1d7 Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Fri, 9 Dec 2022 11:13:09 -0800 Subject: [PATCH 3/4] Fix the `testConnectionPoolPendingAcquireSize` test to detect deregistration of the pool metrics after disposal. --- .../resources/PooledConnectionProviderDefaultMetricsTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java index a466ff3982..b84da88b8d 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java @@ -293,7 +293,8 @@ void testConnectionPoolPendingAcquireSize() throws Exception { provider.disposeLater() .block(Duration.ofSeconds(30)); } - assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2.testConnectionPoolPendingAcquireSize")).isEqualTo(0); + // deRegistered + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2.testConnectionPoolPendingAcquireSize")).isEqualTo(-1); } private double getGaugeValue(String gaugeName, String poolName) { From 6e7f5f7c698f8cf50b5ab65a394ddbff384c9b0d Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Fri, 9 Dec 2022 12:57:19 -0800 Subject: [PATCH 4/4] Restore new line at the end of ConnectionProviderTest.java --- .../java/reactor/netty/resources/ConnectionProviderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index 017a9841a5..18b62f4548 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -127,4 +127,4 @@ public int permitMaximum() { public void returnPermits(int returned) { } } -} \ No newline at end of file +}