From 0f953310953d1c6eb425e1713495b1599f4b1ad3 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 6 Jul 2022 14:24:31 +0200 Subject: [PATCH] Make HttpMetricsHandlerTests stable (#2353) This PR is an attempt to stabilise the HttpMetricsHandlerTests, because now in netty5, channel listeners execution is rescheduled at the end of the current event loop queue, see Netty PR #9489 (Always notify FutureListener via the EventExecutor) So, this is problem for the HttpMetricsHandlerTests tests, because when we receive a response from a server, we start to verify metrics, while the metrics may be not be already up-to-date because they are updated using write listeners, which may not be executed yet at the time the client receives the response. This PR ensures that when the server sends the last response, then all listeners are fully executed. We simply use a channel handler which is placed at the end of the pipeline. So, when the server sends the full response, the write listeners will be executed in reverse order (from left to right, so our channel handler listener will be executed lastly). Related to #1873 --- .../netty/http/HttpMetricsHandlerTests.java | 146 +++++++++++------- 1 file changed, 87 insertions(+), 59 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 33ca81f47c..07c076b3df 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -25,11 +25,14 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.tck.MeterRegistryAssert; import io.netty5.buffer.api.Buffer; -import io.netty5.channel.EventLoop; +import io.netty5.channel.ChannelHandlerAdapter; +import io.netty5.channel.ChannelHandlerContext; +import io.netty5.handler.codec.http.LastHttpContent; import io.netty5.handler.codec.http2.HttpConversionUtil; import io.netty5.handler.ssl.SslProvider; import io.netty5.handler.ssl.util.InsecureTrustManagerFactory; import io.netty5.handler.ssl.util.SelfSignedCertificate; +import io.netty5.util.concurrent.Future; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -62,7 +65,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -107,7 +109,6 @@ class HttpMetricsHandlerTests extends BaseHttpTest { static Http2SslContextSpec serverCtx2; static Http11SslContextSpec clientCtx11; static Http2SslContextSpec clientCtx2; - static final ConcurrentLinkedQueue eventLoops = new ConcurrentLinkedQueue<>(); @BeforeAll static void createSelfSignedCertificate() throws CertificateException { @@ -140,7 +141,6 @@ void setUp() { .host("127.0.0.1") .metrics(true, Function.identity()) .httpRequestDecoder(spec -> spec.h2cMaxContentLength(256)) - .doOnConnection(c -> eventLoops.add(c.channel().executor())) .route(r -> r.post("/1", (req, res) -> res.header("Connection", "close") .send(req.receive().transferOwnership().delayElements(Duration.ofMillis(10)))) .post("/2", (req, res) -> res.header("Connection", "close") @@ -162,7 +162,6 @@ void setUp() { @AfterEach void tearDown() { - eventLoops.clear(); provider.disposeLater() .block(Duration.ofSeconds(30)); @@ -175,12 +174,12 @@ void tearDown() { @MethodSource("httpCompatibleProtocols") void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .childObserve(observerDisconnect) .bindNow(); @@ -199,7 +198,7 @@ void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientP .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -220,7 +219,7 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("/1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(expectedCloses); + CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); latchRef.set(latch2); StepVerifier.create(httpClient.post() @@ -233,7 +232,7 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch2); + assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsExisting("/2", sa.getHostString() + ":" + sa.getPort(), connIndex, serverCtx != null, @@ -292,14 +291,14 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedDisconnects = getExpectedCloses(negotiatedProtocol); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); - CountDownLatch latch = new CountDownLatch(expectedDisconnects); + CountDownLatch latch = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .childObserve(observerDisconnect) .bindNow(); @@ -318,7 +317,7 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -348,7 +347,7 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), 1, 1, serverCtx != null, numWrites[0], numReads[0], bytesWrite[0], bytesRead[0]); - CountDownLatch latch2 = new CountDownLatch(expectedDisconnects); + CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); latchRef.set(latch2); StepVerifier.create(httpClient @@ -361,7 +360,7 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch2); + assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), connIndex, 2, serverCtx != null, @@ -372,12 +371,12 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. @MethodSource("httpCompatibleProtocols") void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .metrics(true, s -> "testUriTagValueResolver") .childObserve(observerDisconnect) .bindNow(); @@ -398,7 +397,7 @@ void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -424,13 +423,13 @@ else if (clientProtocols.length == 2 && @MethodSource("httpCompatibleProtocols") void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch1 = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); disposableServer = - customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, + customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true).metrics(true, s -> { if ("/1".equals(s)) { return "testUriTagValueFunctionNotShared_1"; @@ -459,7 +458,7 @@ void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, H .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -477,7 +476,7 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("testUriTagValueFunctionNotShared_1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(expectedCloses); + CountDownLatch latch2 = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); latchRef.set(latch2); httpClient.metrics(true, s -> "testUriTagValueFunctionNotShared_2") @@ -492,7 +491,7 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch2); + assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); sa = (InetSocketAddress) serverAddress.get(); @@ -526,7 +525,7 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -559,7 +558,7 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -569,13 +568,13 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc @MethodSource("httpCompatibleProtocols") void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negotiatedProtocol); - CountDownLatch latch = new CountDownLatch(expectedCloses); + int expectedDisconnects = getExpectedDiconnects(negotiatedProtocol); + CountDownLatch latch = new CountDownLatch(expectedDisconnects + 1 /* server metrics are updated, see customizeServerOptions method */); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); boolean isHttp11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, true) .metrics(true, Function.identity()) .childObserve(observerDisconnect) .bindNow(); @@ -602,7 +601,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); // now check the server counters if (isHttp11) { @@ -659,7 +658,7 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); assertThat(ServerRecorder.INSTANCE.done.await(30, TimeUnit.SECONDS)).as("recorder latch await").isTrue(); if (isHttp11) { assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); @@ -692,7 +691,7 @@ void testIssue896() throws Exception { .responseContent() .subscribe(); - awaitForHttpClientCompletion(latch); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress(); String serverAddress = sa.getHostString() + ":" + sa.getPort(); @@ -705,19 +704,21 @@ void testIssue896() throws Exception { @MethodSource("http11CompatibleProtocols") void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events - AtomicReference latchRef = new AtomicReference<>(latch1); + CountDownLatch latch = new CountDownLatch(5); // expect to observe 2 server disconnect + 2 client disconnect events + 1 event when request is fully handled + AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); - disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + // we need to register our latch handler using doOnChannelInit because here, the HttpTrafficHandler will return an http response error + // without ever calling doOnConnection callback (see for example HttpTrafficHandler.channelRead method). + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols, latchRef, false) .httpRequestDecoder(spec -> spec.maxHeaderSize(32)) .childObserve(observerDisconnect) .bindNow(); AtomicReference serverAddress = new AtomicReference<>(); - httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) -> - serverAddress.set(conn.channel().remoteAddress()) - ).observe(observerDisconnect); + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols) + .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress())) + .observe(observerDisconnect); httpClient.get() .uri("/max_header_size") @@ -727,7 +728,7 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - awaitForHttpClientCompletion(latch1); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -742,19 +743,6 @@ private ConnectionObserver observeDisconnect(AtomicReference lat }; } - private void awaitForHttpClientCompletion(CountDownLatch latch) throws InterruptedException { - // Wait for HttpClient response to be complete - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - - // since Netty PR #9489, channel listeners execution are not synchronous anymore and are rescheduled in event loop queues. - // So, to prevent situation where an HttpClient response is available while any server pending listeners are not yet executed, - // we need to ensure that all event loop are idle. This is necessary, else we may start to test metrics while they are not up-to-date - CountDownLatch idleEventLoopslatch = new CountDownLatch(eventLoops.size()); - eventLoops.forEach(el -> el.execute(idleEventLoopslatch::countDown)); - idleEventLoopslatch.await(30, TimeUnit.SECONDS); - assertThat(idleEventLoopslatch.await(30, TimeUnit.SECONDS)).as("event loop idleness checker task failed").isTrue(); - } - private void checkServerConnectionsMicrometer(HttpServerRequest request) { String address = formatSocketAddress(request.hostAddress()); boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); @@ -877,11 +865,24 @@ private void checkExpectationsBadRequest(String serverAddress, boolean checkTls) } HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { - return ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx)); + return customizeServerOptions(httpServer, ctx, protocols, null, false); + } + + HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols, + @Nullable AtomicReference latchRef, boolean registerLatchHandlerOnConnection) { + HttpServer server = ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx)); + if (latchRef != null) { + RequestCompletedHandler handler = RequestCompletedHandler.INSTANCE; + handler.reset(latchRef); + server = registerLatchHandlerOnConnection ? + server.doOnConnection(connection -> connection.channel().pipeline().addLast(handler)) : + server.doOnChannelInit((connectionObserver, channel, socketAddress) -> channel.pipeline().addLast(handler)); + } + + return server; } HttpClient customizeClientOptions(HttpClient httpClient, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { - httpClient.doOnConnected(connection -> eventLoops.add(connection.channel().executor())); return ctx == null ? httpClient.protocol(protocols) : httpClient.protocol(protocols).secure(spec -> spec.sslContext(ctx)); } @@ -932,7 +933,7 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag * @param protocol the protocol used (for HTTP11, we expect to observe 4 disconnect events, and for other (H2/H2C), we expect 3 events)). * @return number of disconnect events we expect to observe on a given connection */ - int getExpectedCloses(HttpProtocol protocol) { + int getExpectedDiconnects(HttpProtocol protocol) { return switch (protocol) { case H2, H2C -> 3; case HTTP11 -> 4; @@ -1208,4 +1209,31 @@ public void recordConnectTime(SocketAddress socketAddress, Duration duration, St public void recordResolveAddressTime(SocketAddress socketAddress, Duration duration, String s) { } } + + /** + * Handler used to ensure that the request has completed on the server. + */ + static final class RequestCompletedHandler extends ChannelHandlerAdapter { + static final RequestCompletedHandler INSTANCE = new RequestCompletedHandler(); + private AtomicReference latchRef = new AtomicReference<>(null); + + void reset(AtomicReference latchRef) { + this.latchRef = latchRef; + } + + @Override + public Future write(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof LastHttpContent) { + return ctx.write(msg).addListener(future -> latchRef.get().countDown()); + } + else { + return ctx.write(msg); + } + } + + @Override + public boolean isSharable() { + return true; // A server may accept multiple connections, hence this handler must be sharable + } + } }