diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 33ca81f47c..07c076b3df 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -25,11 +25,14 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.tck.MeterRegistryAssert; import io.netty5.buffer.api.Buffer; -import io.netty5.channel.EventLoop; +import io.netty5.channel.ChannelHandlerAdapter; +import io.netty5.channel.ChannelHandlerContext; +import io.netty5.handler.codec.http.LastHttpContent; import io.netty5.handler.codec.http2.HttpConversionUtil; import io.netty5.handler.ssl.SslProvider; import io.netty5.handler.ssl.util.InsecureTrustManagerFactory; import io.netty5.handler.ssl.util.SelfSignedCertificate; +import io.netty5.util.concurrent.Future; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -62,7 +65,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -107,7 +109,6 @@ class HttpMetricsHandlerTests extends BaseHttpTest { static Http2SslContextSpec serverCtx2; static Http11SslContextSpec clientCtx11; static Http2SslContextSpec clientCtx2; - static final ConcurrentLinkedQueue eventLoops = new ConcurrentLinkedQueue<>(); @BeforeAll static void createSelfSignedCertificate() throws CertificateException { @@ -140,7 +141,6 @@ void setUp() { .host("127.0.0.1") .metrics(true, Function.identity()) .httpRequestDecoder(spec -> spec.h2cMaxContentLength(256)) - .doOnConnection(c -> eventLoops.add(c.channel().executor())) .route(r -> r.post("/1", (req, res) -> res.header("Connection", "close") .send(req.receive().transferOwnership().delayElements(Duration.ofMillis(10)))) .post("/2", (req, res) -> res.header("Connection", "close") @@ -162,7 +162,6 @@ void setUp() { @AfterEach void tearDown() { - eventLoops.clear(); provider.disposeLater() .block(Duration.ofSeconds(30)); @@ -175,12 +174,12 @@ void tearDown() { @MethodSource("httpCompatibleProtocols") void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .childObserve(observerDisconnect) .bindNow(); @@ -199,7 +198,7 @@ void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientP .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -220,7 +219,7 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("/1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(expectedCloses); + CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); latchRef.set(latch2); StepVerifier.create(httpClient.post() @@ -233,7 +232,7 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch2); + assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsExisting("/2", sa.getHostString() + ":" + sa.getPort(), connIndex, serverCtx != null, @@ -292,14 +291,14 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedCloses(negotiatedProtocol); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); - CountDownLatch latch = new CountDownLatch(expectedDisconnects); + CountDownLatch latch = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .childObserve(observerDisconnect) .bindNow(); @@ -318,7 +317,7 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -348,7 +347,7 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), 1, 1, serverCtx != null, numWrites[0], numReads[0], bytesWrite[0], bytesRead[0]); - CountDownLatch latch2 = new CountDownLatch(expectedDisconnects); + CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); latchRef.set(latch2); StepVerifier.create(httpClient @@ -361,7 +360,7 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch2); + assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), connIndex, 2, serverCtx != null, @@ -372,12 +371,12 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. @MethodSource("httpCompatibleProtocols") void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .metrics(true, s -> "testUriTagValueResolver") .childObserve(observerDisconnect) .bindNow(); @@ -398,7 +397,7 @@ void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -424,13 +423,13 @@ else if (clientProtocols.length == 2 && @MethodSource("httpCompatibleProtocols") void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); disposableServer = - customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, + customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true).metrics(true, s -> { if ("/1".equals(s)) { return "testUriTagValueFunctionNotShared_1"; @@ -459,7 +458,7 @@ void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, H .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -477,7 +476,7 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("testUriTagValueFunctionNotShared_1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(expectedCloses); + CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); latchRef.set(latch2); httpClient.metrics(true, s -> "testUriTagValueFunctionNotShared_2") @@ -492,7 +491,7 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch2); + assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); @@ -526,7 +525,7 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -559,7 +558,7 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -569,13 +568,13 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc @MethodSource("httpCompatibleProtocols") void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .metrics(true, Function.identity()) .childObserve(observerDisconnect) .bindNow(); @@ -602,7 +601,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); // now check the server counters if (isHttp11) { @@ -659,7 +658,7 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); assertThat(ServerRecorder.INSTANCE.done.await(30, TimeUnit.SECONDS)).as("recorder latch await").isTrue(); if (isHttp11) { assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); @@ -692,7 +691,7 @@ void testIssue896() throws Exception { .responseContent() .subscribe(); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress(); String serverAddress = sa.getHostString() + ":" + sa.getPort(); @@ -705,19 +704,21 @@ void testIssue896() throws Exception { @MethodSource("http11CompatibleProtocols") void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events - AtomicReference latchRef = new AtomicReference<>(latch1); + CountDownLatch latch = new CountDownLatch(5); // expect to observe 2 server disconnect + 2 client disconnect events + 1 event when request is fully handled + AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + // we need to register our latch handler using doOnChannelInit because here, the HttpTrafficHandler will return an http response error + // without ever calling doOnConnection callback (see for example HttpTrafficHandler.channelRead method). + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, false) .httpRequestDecoder(spec -> spec.maxHeaderSize(32)) .childObserve(observerDisconnect) .bindNow(); AtomicReference serverAddress = new AtomicReference<>(); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) -> - serverAddress.set(conn.channel().remoteAddress()) - ).observe(observerDisconnect); + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .observe(observerDisconnect); httpClient.get() .uri("/max_header_size") @@ -727,7 +728,7 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -742,19 +743,6 @@ private ConnectionObserver observeDisconnect(AtomicReference lat }; } - private void awaitForHttpClientCompletion(CountDownLatch latch) throws InterruptedException { - // Wait for HttpClient response to be complete - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - - // since Netty PR #9489, channel listeners execution are not synchronous anymore and are rescheduled in event loop queues. - // So, to prevent situation where an HttpClient response is available while any server pending listeners are not yet executed, - // we need to ensure that all event loop are idle. This is necessary, else we may start to test metrics while they are not up-to-date - CountDownLatch idleEventLoopslatch = new CountDownLatch(eventLoops.size()); - eventLoops.forEach(el -> el.execute(idleEventLoopslatch::countDown)); - idleEventLoopslatch.await(30, TimeUnit.SECONDS); - assertThat(idleEventLoopslatch.await(30, TimeUnit.SECONDS)).as("event loop idleness checker task failed").isTrue(); - } - private void checkServerConnectionsMicrometer(HttpServerRequest request) { String address = formatSocketAddress(request.hostAddress()); boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); @@ -877,11 +865,24 @@ private void checkExpectationsBadRequest(String serverAddress, boolean checkTls) } HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { - return ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx)); + return customizeServerOptions(httpServer, ctx, protocols, null, false); + } + + HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols, + @Nullable AtomicReference latchRef, boolean registerLatchHandlerOnConnection) { + HttpServer server = ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx)); + if (latchRef != null) { + RequestCompletedHandler handler = RequestCompletedHandler.INSTANCE; + handler.reset(latchRef); + server = registerLatchHandlerOnConnection ? + server.doOnConnection(connection -> connection.channel().pipeline().addLast(handler)) : + server.doOnChannelInit((connectionObserver, channel, socketAddress) -> channel.pipeline().addLast(handler)); + } + + return server; } HttpClient customizeClientOptions(HttpClient httpClient, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { - httpClient.doOnConnected(connection -> eventLoops.add(connection.channel().executor())); return ctx == null ? httpClient.protocol(protocols) : httpClient.protocol(protocols).secure(spec -> spec.sslContext(ctx)); } @@ -932,7 +933,7 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag * @param protocol the protocol used (for HTTP11, we expect to observe 4 disconnect events, and for other (H2/H2C), we expect 3 events)). * @return number of disconnect events we expect to observe on a given connection */ - int getExpectedCloses(HttpProtocol protocol) { + int getExpectedDiconnects(HttpProtocol protocol) { return switch (protocol) { case H2, H2C -> 3; case HTTP11 -> 4; @@ -1208,4 +1209,31 @@ public void recordConnectTime(SocketAddress socketAddress, Duration duration, St public void recordResolveAddressTime(SocketAddress socketAddress, Duration duration, String s) { } } + + /** + * Handler used to ensure that the request has completed on the server. + */ + static final class RequestCompletedHandler extends ChannelHandlerAdapter { + static final RequestCompletedHandler INSTANCE = new RequestCompletedHandler(); + private AtomicReference latchRef = new AtomicReference<>(null); + + void reset(AtomicReference latchRef) { + this.latchRef = latchRef; + } + + @Override + public Future write(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof LastHttpContent) { + return ctx.write(msg).addListener(future -> latchRef.get().countDown()); + } + else { + return ctx.write(msg); + } + } + + @Override + public boolean isSharable() { + return true; // A server may accept multiple connections, hence this handler must be sharable + } + } }