From 0871067e6780bfe27e2873c0415fc8281a675131 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 28 Jun 2022 14:06:45 +0200 Subject: [PATCH 01/14] Reenable HttpMetricsHandlerTest in netty5 branch --- .../AbstractHttpClientMetricsHandler.java | 4 + .../netty/http/client/HttpTrafficHandler.java | 6 +- .../AbstractHttpServerMetricsHandler.java | 33 +++++--- .../netty/http/HttpMetricsHandlerTests.java | 75 ++++++++++++------- 4 files changed, 78 insertions(+), 40 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java index 67f4e73f0d..5634226f11 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java @@ -20,6 +20,7 @@ import io.netty5.channel.Channel; import io.netty5.channel.ChannelHandlerAdapter; import io.netty5.channel.ChannelHandlerContext; +import io.netty5.handler.codec.http.HttpContent; import io.netty5.handler.codec.http.HttpRequest; import io.netty5.handler.codec.http.HttpResponse; import io.netty5.handler.codec.http.LastHttpContent; @@ -165,6 +166,9 @@ private long extractProcessedDataFromBuffer(Object msg) { else if (msg instanceof Buffer) { return ((Buffer) msg).readableBytes(); } + else if (msg instanceof HttpContent) { + return ((HttpContent) msg).payload().readableBytes(); + } return 0; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java index 58d7a79429..ac8ca606aa 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java @@ -82,6 +82,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Override public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) { Channel channel = ctx.channel(); + boolean removeThisHandler = false; if (evt == UPGRADE_ISSUED) { if (log.isDebugEnabled()) { log.debug(format(channel, "An upgrade request was sent to the server.")); @@ -92,7 +93,7 @@ else if (evt == UPGRADE_SUCCESSFUL) { log.debug(format(channel, "The upgrade to H2C protocol was successful.")); } sendNewState(Connection.from(channel), HttpClientState.UPGRADE_SUCCESSFUL); - ctx.pipeline().remove(this); + removeThisHandler = true; // we have to remove ourself from the pipleline after having fired the event below. } else if (evt == UPGRADE_REJECTED) { if (log.isDebugEnabled()) { @@ -101,6 +102,9 @@ else if (evt == UPGRADE_REJECTED) { sendNewState(Connection.from(channel), HttpClientState.UPGRADE_REJECTED); } ctx.fireChannelInboundEvent(evt); + if (removeThisHandler) { + ctx.pipeline().remove(this); + } } void sendNewState(Connection connection, ConnectionObserver.State state) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 14f7e21497..68821d64de 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -19,6 +19,7 @@ import io.netty5.buffer.api.Buffer; import io.netty5.channel.ChannelHandlerAdapter; import io.netty5.channel.ChannelHandlerContext; +import io.netty5.handler.codec.http.HttpContent; import io.netty5.handler.codec.http.HttpRequest; import io.netty5.handler.codec.http.HttpResponse; import io.netty5.handler.codec.http.HttpResponseStatus; @@ -114,25 +115,34 @@ public Future write(ChannelHandlerContext ctx, Object msg) { dataSent += extractProcessedDataFromBuffer(msg); if (msg instanceof LastHttpContent) { + // The listeners are now invoked asynchronously (see https://github.com/netty/netty/pull/9489), + // and it seems we need to first obtain the channelOps, which may not be present anymore + // when the listener will be invoked. + ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); return ctx.write(msg) .addListener(future -> { - try { - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations ops) { - recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), - ops.method().name(), ops.status().codeAsText().toString()); + try { + recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), + ops.method().name(), ops.status().codeAsText().toString()); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } if (!ops.isHttp2() && ops.hostAddress() != null) { // This metric is not applicable for HTTP/2 // ops.hostAddress() == null when request decoding failed, in this case // we do not report active connection, so we do not report inactive connection - recordInactiveConnection(ops); + try { + recordInactiveConnection(ops); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } } } - } - catch (RuntimeException e) { - log.warn("Exception caught while recording metrics.", e); - // Allow request-response exchange to continue, unaffected by metrics problem - } dataSent = 0; }); @@ -202,6 +212,9 @@ private long extractProcessedDataFromBuffer(Object msg) { else if (msg instanceof Buffer) { return ((Buffer) msg).readableBytes(); } + else if (msg instanceof HttpContent) { + return ((HttpContent) msg).payload().readableBytes(); + } return 0; } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index e93eb74306..63ff8444bb 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -93,7 +93,6 @@ /** * @author Violeta Georgieva */ -@Disabled class HttpMetricsHandlerTests extends BaseHttpTest { HttpServer httpServer; private ConnectionProvider provider; @@ -171,8 +170,9 @@ void tearDown() { @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negociatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -216,7 +216,7 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("/1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events + CountDownLatch latch2 = new CountDownLatch(expectedCloses); latchRef.set(latch2); StepVerifier.create(httpClient.post() @@ -241,7 +241,7 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, id -> { throw new IllegalArgumentException("Testcase injected Exception"); @@ -265,7 +265,7 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -287,11 +287,11 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { // For HTTP11, we expect to observe 2 DISCONNECTS for client, and 2 DISCONNECT for server. // Else, we expect to observe 2 DISCONNECTS for client, and 1 DISCONNECT for server. boolean isHTTP11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; - int expectedDisconnects = isHTTP11 ? 4 : 3; + int expectedDisconnects = getExpectedCloses(negociatedProtocol); CountDownLatch latch = new CountDownLatch(expectedDisconnects); AtomicReference latchRef = new AtomicReference<>(latch); @@ -369,8 +369,9 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negociatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -420,8 +421,9 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negociatedProtocol); + CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -473,7 +475,7 @@ else if (clientProtocols.length == 2 && checkExpectationsExisting("testUriTagValueFunctionNotShared_1", sa.getHostString() + ":" + sa.getPort(), 1, serverCtx != null, numWrites[0], bytesWrite[0]); - CountDownLatch latch2 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events + CountDownLatch latch2 = new CountDownLatch(expectedCloses); latchRef.set(latch2); httpClient.metrics(true, s -> "testUriTagValueFunctionNotShared_2") @@ -499,7 +501,7 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow(); ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE; @@ -530,7 +532,7 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE; disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder) @@ -562,8 +564,9 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - CountDownLatch latch = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negociatedProtocol); + CountDownLatch latch = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -618,7 +621,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { // Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after // ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state ServerRecorder.INSTANCE.reset(); @@ -905,6 +908,20 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag } } + /** + * Get number of disconnect events we expect to observe on a given connection. + * @param protocol the protocol used (for HTTP11, we expect to observe 4 disconnect events, and for other (H2/H2C), we expect 3 events)). + * @return number of disconnect events we expect to observe on a given connection + */ + int getExpectedCloses(HttpProtocol protocol) { + return switch (protocol) { + case H2 -> 3; + case H2C -> 3; + case HTTP11 -> 4; + default -> throw new IllegalArgumentException("unexpected protocol"); + }; + } + static Stream http11CompatibleProtocols() { return Stream.of( Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), @@ -918,23 +935,23 @@ static Stream http11CompatibleProtocols() { static Stream httpCompatibleProtocols() { return Stream.of( - Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), + Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11), Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, - Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11)), + Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11), HttpProtocol.HTTP11), Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11)), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11), HttpProtocol.HTTP11), Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2)), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null), - Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null) + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C}, new HttpProtocol[]{HttpProtocol.H2C}, null, null, HttpProtocol.H2C), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C}, null, null, HttpProtocol.H2C), + Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, null, null, HttpProtocol.H2C) ); } From f58c4330088d9cd3d5400bae60f4e331d8aeb4f5 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 28 Jun 2022 18:25:05 +0200 Subject: [PATCH 02/14] polish --- .../netty/http/HttpMetricsHandlerTests.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 4829319944..47e4c5221c 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -240,7 +240,8 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, + @SuppressWarnings("unused") HttpProtocol negociatedProtocol) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, id -> { throw new IllegalArgumentException("Testcase injected Exception"); @@ -264,7 +265,8 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, + @SuppressWarnings("unused") HttpProtocol negociatedProtocol) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -289,7 +291,6 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { // For HTTP11, we expect to observe 2 DISCONNECTS for client, and 2 DISCONNECT for server. // Else, we expect to observe 2 DISCONNECTS for client, and 1 DISCONNECT for server. - boolean isHTTP11 = clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11; int expectedDisconnects = getExpectedCloses(negociatedProtocol); CountDownLatch latch = new CountDownLatch(expectedDisconnects); @@ -500,7 +501,8 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, + @SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow(); ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE; @@ -531,7 +533,8 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, + @SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception { ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE; disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder) @@ -620,7 +623,8 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, + @SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception { // Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after // ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state ServerRecorder.INSTANCE.reset(); @@ -914,10 +918,8 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag */ int getExpectedCloses(HttpProtocol protocol) { return switch (protocol) { - case H2 -> 3; - case H2C -> 3; + case H2, H2C -> 3; case HTTP11 -> 4; - default -> throw new IllegalArgumentException("unexpected protocol"); }; } From 8b120f1242200e8425a03694d46f69cc15c002b8 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 28 Jun 2022 19:17:13 +0200 Subject: [PATCH 03/14] fixed spelling mistake --- .../netty/http/client/HttpTrafficHandler.java | 2 +- .../netty/http/HttpMetricsHandlerTests.java | 30 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java index ac8ca606aa..8b0b50b84a 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java @@ -93,7 +93,7 @@ else if (evt == UPGRADE_SUCCESSFUL) { log.debug(format(channel, "The upgrade to H2C protocol was successful.")); } sendNewState(Connection.from(channel), HttpClientState.UPGRADE_SUCCESSFUL); - removeThisHandler = true; // we have to remove ourself from the pipleline after having fired the event below. + removeThisHandler = true; // we have to remove ourselves from the pipleline after having fired the event below. } else if (evt == UPGRADE_REJECTED) { if (log.isDebugEnabled()) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 47e4c5221c..de3c9868ba 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -169,8 +169,8 @@ void tearDown() { @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negociatedProtocol); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negotiatedProtocol); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -241,7 +241,7 @@ else if (clientProtocols.length == 2 && @MethodSource("httpCompatibleProtocols") void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negociatedProtocol) { + @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, id -> { throw new IllegalArgumentException("Testcase injected Exception"); @@ -266,7 +266,7 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] @MethodSource("httpCompatibleProtocols") void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negociatedProtocol) { + @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -288,10 +288,10 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { // For HTTP11, we expect to observe 2 DISCONNECTS for client, and 2 DISCONNECT for server. // Else, we expect to observe 2 DISCONNECTS for client, and 1 DISCONNECT for server. - int expectedDisconnects = getExpectedCloses(negociatedProtocol); + int expectedDisconnects = getExpectedCloses(negotiatedProtocol); CountDownLatch latch = new CountDownLatch(expectedDisconnects); AtomicReference latchRef = new AtomicReference<>(latch); @@ -369,8 +369,8 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negociatedProtocol); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negotiatedProtocol); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -421,8 +421,8 @@ else if (clientProtocols.length == 2 && @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negociatedProtocol); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negotiatedProtocol); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -502,7 +502,7 @@ else if (clientProtocols.length == 2 && @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception { + @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow(); ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE; @@ -534,7 +534,7 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc @MethodSource("httpCompatibleProtocols") void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception { + @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE; disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder) @@ -566,8 +566,8 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negociatedProtocol) throws Exception { - int expectedCloses = getExpectedCloses(negociatedProtocol); + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + int expectedCloses = getExpectedCloses(negotiatedProtocol); CountDownLatch latch = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -624,7 +624,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco @MethodSource("httpCompatibleProtocols") void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, - @SuppressWarnings("unused") HttpProtocol negociatedProtocol) throws Exception { + @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { // Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after // ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state ServerRecorder.INSTANCE.reset(); From d30d752cd07aef3968446969961a155caadcbe34 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 28 Jun 2022 19:30:06 +0200 Subject: [PATCH 04/14] removed a comment in test. fixed again spelling mistake. --- .../main/java/reactor/netty/http/client/HttpTrafficHandler.java | 2 +- .../test/java/reactor/netty/http/HttpMetricsHandlerTests.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java index 8b0b50b84a..3239c5bd64 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpTrafficHandler.java @@ -93,7 +93,7 @@ else if (evt == UPGRADE_SUCCESSFUL) { log.debug(format(channel, "The upgrade to H2C protocol was successful.")); } sendNewState(Connection.from(channel), HttpClientState.UPGRADE_SUCCESSFUL); - removeThisHandler = true; // we have to remove ourselves from the pipleline after having fired the event below. + removeThisHandler = true; // we have to remove ourselves from the pipeline after having fired the event below. } else if (evt == UPGRADE_REJECTED) { if (log.isDebugEnabled()) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index de3c9868ba..2381a8c6b8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -289,8 +289,6 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { - // For HTTP11, we expect to observe 2 DISCONNECTS for client, and 2 DISCONNECT for server. - // Else, we expect to observe 2 DISCONNECTS for client, and 1 DISCONNECT for server. int expectedDisconnects = getExpectedCloses(negotiatedProtocol); CountDownLatch latch = new CountDownLatch(expectedDisconnects); From 6ece59a5a5c9924b2e43e5be45f9c9f9b7bface2 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 28 Jun 2022 20:07:52 +0200 Subject: [PATCH 05/14] temporarily disable testRecordingFailsServerSide and testRecordingFailsClientSide tests --- .../test/java/reactor/netty/http/HttpMetricsHandlerTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 2381a8c6b8..ff0f40b7c5 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -239,6 +240,7 @@ else if (clientProtocols.length == 2 && // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") + @Disabled void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { @@ -264,6 +266,7 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") + @Disabled void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { From cee61dca92abd36886449ddd68bec55caf4bf5eb Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 29 Jun 2022 14:28:27 +0200 Subject: [PATCH 06/14] use instanceof pattern matching. --- .../netty/http/client/AbstractHttpClientMetricsHandler.java | 4 ++-- .../netty/http/server/AbstractHttpServerMetricsHandler.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java index f57b2c485e..e838dab8d1 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java @@ -166,8 +166,8 @@ private long extractProcessedDataFromBuffer(Object msg) { else if (msg instanceof Buffer buffer) { return buffer.readableBytes(); } - else if (msg instanceof HttpContent) { - return ((HttpContent) msg).payload().readableBytes(); + else if (msg instanceof HttpContent httpContent) { + return httpContent.payload().readableBytes(); } return 0; } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 5262ed5576..8e9abe2c5b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -212,8 +212,8 @@ private long extractProcessedDataFromBuffer(Object msg) { else if (msg instanceof Buffer buffer) { return buffer.readableBytes(); } - else if (msg instanceof HttpContent) { - return ((HttpContent) msg).payload().readableBytes(); + else if (msg instanceof HttpContent httpContent) { + return httpContent.payload().readableBytes(); } return 0; } From 01e4da8a81266ee8ba5d248b64299e12150476ea Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 29 Jun 2022 15:00:05 +0200 Subject: [PATCH 07/14] Temporary traces in order to investigate failed test. --- .../java/reactor/netty/http/HttpMetricsHandlerTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index ff0f40b7c5..8e2c3cbe82 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -240,7 +240,6 @@ else if (clientProtocols.length == 2 && // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") - @Disabled void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { @@ -266,7 +265,6 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") - @Disabled void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { @@ -294,6 +292,10 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedDisconnects = getExpectedCloses(negotiatedProtocol); + System.out.println("XX: testNonExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", negotiatedProtocol=" + negotiatedProtocol + + ", expectedDisconnects=" + expectedDisconnects); + CountDownLatch latch = new CountDownLatch(expectedDisconnects); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); From a87c6611f523c4c07c65a34b1690b41c2fcd59aa Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 29 Jun 2022 15:13:10 +0200 Subject: [PATCH 08/14] added missing type argument when using instanceof pattern matching --- .../netty/http/client/AbstractHttpClientMetricsHandler.java | 2 +- .../netty/http/server/AbstractHttpServerMetricsHandler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java index e838dab8d1..d14cefeebf 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java @@ -166,7 +166,7 @@ private long extractProcessedDataFromBuffer(Object msg) { else if (msg instanceof Buffer buffer) { return buffer.readableBytes(); } - else if (msg instanceof HttpContent httpContent) { + else if (msg instanceof HttpContent httpContent) { return httpContent.payload().readableBytes(); } return 0; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 8e9abe2c5b..a75fda28e4 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -212,7 +212,7 @@ private long extractProcessedDataFromBuffer(Object msg) { else if (msg instanceof Buffer buffer) { return buffer.readableBytes(); } - else if (msg instanceof HttpContent httpContent) { + else if (msg instanceof HttpContent httpContent) { return httpContent.payload().readableBytes(); } return 0; From ed34846abb4645eb047d2f993698fa12659cf931 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 29 Jun 2022 15:29:30 +0200 Subject: [PATCH 09/14] temporary debug changes --- .github/workflows/check_transport.yml | 2 +- .../java/reactor/netty/http/HttpMetricsHandlerTests.java | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/check_transport.yml b/.github/workflows/check_transport.yml index 0447bd4f91..ce8283f671 100644 --- a/.github/workflows/check_transport.yml +++ b/.github/workflows/check_transport.yml @@ -92,5 +92,5 @@ jobs: run: ./mvnw install -DskipTests=true working-directory: ./socks-proxy - name: Build with Gradle - run: ./gradlew clean check --no-daemon -PforceTransport=${{ matrix.transport }} -PspotlessFrom=origin/netty5 + run: ./gradlew clean check -i --no-daemon -PforceTransport=${{ matrix.transport }} -PspotlessFrom=origin/netty5 working-directory: ./reactor-netty \ No newline at end of file diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 8e2c3cbe82..f96a19f387 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -32,7 +32,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -52,6 +51,8 @@ import reactor.netty.resources.ConnectionProvider; import reactor.netty.tcp.SslProvider.ProtocolSslContextSpec; import reactor.test.StepVerifier; +import reactor.util.Logger; +import reactor.util.Loggers; import reactor.util.annotation.Nullable; import reactor.util.context.Context; import reactor.util.context.ContextView; @@ -107,6 +108,8 @@ class HttpMetricsHandlerTests extends BaseHttpTest { static Http11SslContextSpec clientCtx11; static Http2SslContextSpec clientCtx2; + static final Logger log = Loggers.getLogger(HttpMetricsHandlerTests.class); + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); @@ -292,7 +295,7 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedDisconnects = getExpectedCloses(negotiatedProtocol); - System.out.println("XX: testNonExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + log.warn("XX: testNonExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + Arrays.toString(clientProtocols) + ", negotiatedProtocol=" + negotiatedProtocol + ", expectedDisconnects=" + expectedDisconnects); From 176e409a9e8c07e4018328a5e318047ea6342008 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 29 Jun 2022 16:35:36 +0200 Subject: [PATCH 10/14] debug --- .../netty/http/HttpMetricsHandlerTests.java | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index f96a19f387..0ca7365e7f 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -175,6 +175,9 @@ void tearDown() { void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); + log.warn("XX: testExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + + ", expectedDisconnects=" + expectedCloses); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -246,6 +249,8 @@ else if (clientProtocols.length == 2 && void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { + log.warn("XX: testRecordingFailsServerSide: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, id -> { throw new IllegalArgumentException("Testcase injected Exception"); @@ -271,6 +276,8 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { + log.warn("XX: testRecordingFailsClientSide: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -292,11 +299,10 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedDisconnects = getExpectedCloses(negotiatedProtocol); - log.warn("XX: testNonExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", negotiatedProtocol=" + negotiatedProtocol + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + ", expectedDisconnects=" + expectedDisconnects); CountDownLatch latch = new CountDownLatch(expectedDisconnects); @@ -377,6 +383,9 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); + log.warn("XX: testUriTagValueFunction: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + + ", expectedDisconnects=" + expectedCloses); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -429,6 +438,9 @@ else if (clientProtocols.length == 2 && void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); + log.warn("XX: testUriTagValueFunctionNotSharedForClient: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + + ", expectedDisconnects=" + expectedCloses); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -509,6 +521,8 @@ else if (clientProtocols.length == 2 && void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { + log.warn("XX: testContextAwareRecorderOnClient: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow(); ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE; @@ -541,6 +555,8 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { + log.warn("XX: testContextAwareRecorderOnServer: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE; disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder) @@ -574,6 +590,9 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); + log.warn("XX: testServerConnectionsMicrometer: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + + ", expectedDisconnects=" + expectedCloses); CountDownLatch latch = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -631,6 +650,8 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { + log.warn("XX: testServerConnectionsRecorder: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); // Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after // ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state ServerRecorder.INSTANCE.reset(); @@ -680,6 +701,7 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ @Test void testIssue896() throws Exception { + log.warn("XX: testIssue896"); disposableServer = httpServer.noSSL() .bindNow(); @@ -709,6 +731,8 @@ void testIssue896() throws Exception { @MethodSource("http11CompatibleProtocols") void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + log.warn("XX: testBadRequest: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + + Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null)); CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -941,6 +965,13 @@ static Stream http11CompatibleProtocols() { } static Stream httpCompatibleProtocols() { + return Stream.of( + Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2}, + Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2) + ); + } + + static Stream _httpCompatibleProtocols() { return Stream.of( Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11), Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, From 1b329470686a02c4f404b7987044f5abb7b80059 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 29 Jun 2022 16:52:44 +0200 Subject: [PATCH 11/14] removed debug --- .github/workflows/check_transport.yml | 2 +- .../netty/http/HttpMetricsHandlerTests.java | 44 ++----------------- 2 files changed, 5 insertions(+), 41 deletions(-) diff --git a/.github/workflows/check_transport.yml b/.github/workflows/check_transport.yml index ce8283f671..0447bd4f91 100644 --- a/.github/workflows/check_transport.yml +++ b/.github/workflows/check_transport.yml @@ -92,5 +92,5 @@ jobs: run: ./mvnw install -DskipTests=true working-directory: ./socks-proxy - name: Build with Gradle - run: ./gradlew clean check -i --no-daemon -PforceTransport=${{ matrix.transport }} -PspotlessFrom=origin/netty5 + run: ./gradlew clean check --no-daemon -PforceTransport=${{ matrix.transport }} -PspotlessFrom=origin/netty5 working-directory: ./reactor-netty \ No newline at end of file diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 0ca7365e7f..ff0f40b7c5 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -51,8 +52,6 @@ import reactor.netty.resources.ConnectionProvider; import reactor.netty.tcp.SslProvider.ProtocolSslContextSpec; import reactor.test.StepVerifier; -import reactor.util.Logger; -import reactor.util.Loggers; import reactor.util.annotation.Nullable; import reactor.util.context.Context; import reactor.util.context.ContextView; @@ -108,8 +107,6 @@ class HttpMetricsHandlerTests extends BaseHttpTest { static Http11SslContextSpec clientCtx11; static Http2SslContextSpec clientCtx2; - static final Logger log = Loggers.getLogger(HttpMetricsHandlerTests.class); - @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); @@ -175,9 +172,6 @@ void tearDown() { void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); - log.warn("XX: testExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + - ", expectedDisconnects=" + expectedCloses); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -246,11 +240,10 @@ else if (clientProtocols.length == 2 && // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") + @Disabled void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { - log.warn("XX: testRecordingFailsServerSide: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .metrics(true, id -> { throw new IllegalArgumentException("Testcase injected Exception"); @@ -273,11 +266,10 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") + @Disabled void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { - log.warn("XX: testRecordingFailsClientSide: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) .bindNow(); @@ -299,11 +291,8 @@ void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, - @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedDisconnects = getExpectedCloses(negotiatedProtocol); - log.warn("XX: testNonExistingEndpoint: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + - ", expectedDisconnects=" + expectedDisconnects); CountDownLatch latch = new CountDownLatch(expectedDisconnects); AtomicReference latchRef = new AtomicReference<>(latch); @@ -383,9 +372,6 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); - log.warn("XX: testUriTagValueFunction: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + - ", expectedDisconnects=" + expectedCloses); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -438,9 +424,6 @@ else if (clientProtocols.length == 2 && void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); - log.warn("XX: testUriTagValueFunctionNotSharedForClient: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + - ", expectedDisconnects=" + expectedCloses); CountDownLatch latch1 = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -521,8 +504,6 @@ else if (clientProtocols.length == 2 && void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { - log.warn("XX: testContextAwareRecorderOnClient: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).bindNow(); ClientContextAwareRecorder recorder = ClientContextAwareRecorder.INSTANCE; @@ -555,8 +536,6 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { - log.warn("XX: testContextAwareRecorderOnServer: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); ServerContextAwareRecorder recorder = ServerContextAwareRecorder.INSTANCE; disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols).metrics(true, () -> recorder) @@ -590,9 +569,6 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, HttpProtocol negotiatedProtocol) throws Exception { int expectedCloses = getExpectedCloses(negotiatedProtocol); - log.warn("XX: testServerConnectionsMicrometer: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol + - ", expectedDisconnects=" + expectedCloses); CountDownLatch latch = new CountDownLatch(expectedCloses); AtomicReference latchRef = new AtomicReference<>(latch); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -650,8 +626,6 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) throws Exception { - log.warn("XX: testServerConnectionsRecorder: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null) + ", negotiatedProtocol=" + negotiatedProtocol); // Invoke ServerRecorder.INSTANCE.reset() here as disposableServer.dispose (AfterEach) might be invoked after // ServerRecorder.INSTANCE.reset() (AfterEach) and thus leave ServerRecorder.INSTANCE in a bad state ServerRecorder.INSTANCE.reset(); @@ -701,7 +675,6 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ @Test void testIssue896() throws Exception { - log.warn("XX: testIssue896"); disposableServer = httpServer.noSSL() .bindNow(); @@ -731,8 +704,6 @@ void testIssue896() throws Exception { @MethodSource("http11CompatibleProtocols") void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { - log.warn("XX: testBadRequest: serverProtocols=" + Arrays.toString(serverProtocols) + ", clientProtocols=" + - Arrays.toString(clientProtocols) + ", serverSSL=" + (serverCtx != null) + ", clientSSL=" + (clientCtx != null)); CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events AtomicReference latchRef = new AtomicReference<>(latch1); ConnectionObserver observerDisconnect = observeDisconnect(latchRef); @@ -965,13 +936,6 @@ static Stream http11CompatibleProtocols() { } static Stream httpCompatibleProtocols() { - return Stream.of( - Arguments.of(new HttpProtocol[]{HttpProtocol.H2}, new HttpProtocol[]{HttpProtocol.H2}, - Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http2SslContextSpec", clientCtx2), HttpProtocol.H2) - ); - } - - static Stream _httpCompatibleProtocols() { return Stream.of( Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null, HttpProtocol.HTTP11), Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, From 8e29279b0d6f1e96053d6b05ead259136382ff93 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 30 Jun 2022 10:47:42 +0200 Subject: [PATCH 12/14] since netty #9489 fix, event listeners are now rescheduled in event loops queue. this means we should ensure that all event loops are idle before testing metrics --- .../netty/http/HttpMetricsHandlerTests.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index ff0f40b7c5..2b5782f791 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -25,6 +25,7 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.tck.MeterRegistryAssert; import io.netty5.buffer.api.Buffer; +import io.netty5.channel.EventLoop; import io.netty5.handler.codec.http2.HttpConversionUtil; import io.netty5.handler.ssl.SslProvider; import io.netty5.handler.ssl.util.InsecureTrustManagerFactory; @@ -32,7 +33,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -62,6 +62,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -106,6 +107,7 @@ class HttpMetricsHandlerTests extends BaseHttpTest { static Http2SslContextSpec serverCtx2; static Http11SslContextSpec clientCtx11; static Http2SslContextSpec clientCtx2; + static final ConcurrentLinkedQueue eventLoops = new ConcurrentLinkedQueue<>(); @BeforeAll static void createSelfSignedCertificate() throws CertificateException { @@ -138,6 +140,9 @@ void setUp() { .host("127.0.0.1") .metrics(true, Function.identity()) .httpRequestDecoder(spec -> spec.h2cMaxContentLength(256)) + .doOnConnection(c -> { + eventLoops.add(c.channel().executor()); + }) .route(r -> r.post("/1", (req, res) -> res.header("Connection", "close") .send(req.receive().transferOwnership().delayElements(Duration.ofMillis(10)))) .post("/2", (req, res) -> res.header("Connection", "close") @@ -159,6 +164,7 @@ void setUp() { @AfterEach void tearDown() { + eventLoops.clear(); provider.disposeLater() .block(Duration.ofSeconds(30)); @@ -196,6 +202,7 @@ void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientP .verify(Duration.ofSeconds(30)); assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -230,6 +237,7 @@ else if (clientProtocols.length == 2 && .verify(Duration.ofSeconds(30)); assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); sa = (InetSocketAddress) serverAddress.get(); @@ -240,7 +248,6 @@ else if (clientProtocols.length == 2 && // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") - @Disabled void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { @@ -266,7 +273,6 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] // https://github.com/reactor/reactor-netty/issues/2187 @ParameterizedTest @MethodSource("httpCompatibleProtocols") - @Disabled void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @SuppressWarnings("unused") HttpProtocol negotiatedProtocol) { @@ -318,6 +324,7 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie .verify(Duration.ofSeconds(30)); assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -361,6 +368,7 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. .verify(Duration.ofSeconds(30)); assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), connIndex, 2, serverCtx != null, @@ -398,6 +406,7 @@ void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clie .verify(Duration.ofSeconds(30)); assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -459,6 +468,7 @@ void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, H .verify(Duration.ofSeconds(30)); assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -492,6 +502,7 @@ else if (clientProtocols.length == 2 && .verify(Duration.ofSeconds(30)); assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); sa = (InetSocketAddress) serverAddress.get(); @@ -526,6 +537,7 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc .verify(Duration.ofSeconds(30)); assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -559,6 +571,7 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc .verify(Duration.ofSeconds(30)); assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -602,6 +615,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco .verify(Duration.ofSeconds(30)); assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); // now check the server counters if (isHttp11) { @@ -659,6 +673,7 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ .verify(Duration.ofSeconds(30)); assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); assertThat(ServerRecorder.INSTANCE.done.await(30, TimeUnit.SECONDS)).as("recorder latch await").isTrue(); if (isHttp11) { assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); @@ -692,6 +707,7 @@ void testIssue896() throws Exception { .subscribe(); assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress(); String serverAddress = sa.getHostString() + ":" + sa.getPort(); @@ -727,6 +743,7 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco .verify(Duration.ofSeconds(30)); assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + ensureEventLoopsAreIdle(); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -741,6 +758,13 @@ private ConnectionObserver observeDisconnect(AtomicReference lat }; } + private void ensureEventLoopsAreIdle() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(eventLoops.size()); + eventLoops.forEach(el -> el.execute(latch::countDown)); + latch.await(30, TimeUnit.SECONDS); + assertThat(latch.await(30, TimeUnit.SECONDS)).as("event loop idleness checker task failed").isTrue(); + } + private void checkServerConnectionsMicrometer(HttpServerRequest request) { String address = formatSocketAddress(request.hostAddress()); boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text()); From 19a5f80e0864680019a63dcb171f7c435c0c2bfd Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 30 Jun 2022 12:14:20 +0200 Subject: [PATCH 13/14] polish. introduced one awaitForHttpClientCompletion which also ensures that all event loops are idles --- .../netty/http/HttpMetricsHandlerTests.java | 61 ++++++++----------- 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 2b5782f791..09ac0d21f7 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -140,9 +140,7 @@ void setUp() { .host("127.0.0.1") .metrics(true, Function.identity()) .httpRequestDecoder(spec -> spec.h2cMaxContentLength(256)) - .doOnConnection(c -> { - eventLoops.add(c.channel().executor()); - }) + .doOnConnection(c -> eventLoops.add(c.channel().executor())) .route(r -> r.post("/1", (req, res) -> res.header("Connection", "close") .send(req.receive().transferOwnership().delayElements(Duration.ofMillis(10)))) .post("/2", (req, res) -> res.header("Connection", "close") @@ -201,8 +199,7 @@ void testExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientP .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch1); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -236,9 +233,7 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); - + awaitForHttpClientCompletion(latch2); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsExisting("/2", sa.getHostString() + ":" + sa.getPort(), connIndex, serverCtx != null, @@ -323,8 +318,7 @@ void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -367,8 +361,7 @@ else if (protocols.contains(HttpProtocol.H2) || protocols.contains(HttpProtocol. .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch2); sa = (InetSocketAddress) serverAddress.get(); checkExpectationsNonExisting(sa.getHostString() + ":" + sa.getPort(), connIndex, 2, serverCtx != null, @@ -405,8 +398,7 @@ void testUriTagValueFunction(HttpProtocol[] serverProtocols, HttpProtocol[] clie .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch1); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -467,8 +459,7 @@ void testUriTagValueFunctionNotSharedForClient(HttpProtocol[] serverProtocols, H .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch1); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -501,8 +492,7 @@ else if (clientProtocols.length == 2 && .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch2.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch2); sa = (InetSocketAddress) serverAddress.get(); @@ -536,8 +526,7 @@ void testContextAwareRecorderOnClient(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -570,8 +559,7 @@ void testContextAwareRecorderOnServer(HttpProtocol[] serverProtocols, HttpProtoc .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch); assertThat(recorder.onDataReceivedContextView).isTrue(); assertThat(recorder.onDataSentContextView).isTrue(); @@ -614,8 +602,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch); // now check the server counters if (isHttp11) { @@ -672,8 +659,7 @@ void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[ .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch); assertThat(ServerRecorder.INSTANCE.done.await(30, TimeUnit.SECONDS)).as("recorder latch await").isTrue(); if (isHttp11) { assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); @@ -706,8 +692,7 @@ void testIssue896() throws Exception { .responseContent() .subscribe(); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch); InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress(); String serverAddress = sa.getHostString() + ":" + sa.getPort(); @@ -742,8 +727,7 @@ void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtoco .expectComplete() .verify(Duration.ofSeconds(30)); - assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); - ensureEventLoopsAreIdle(); + awaitForHttpClientCompletion(latch1); InetSocketAddress sa = (InetSocketAddress) serverAddress.get(); @@ -758,11 +742,17 @@ private ConnectionObserver observeDisconnect(AtomicReference lat }; } - private void ensureEventLoopsAreIdle() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(eventLoops.size()); - eventLoops.forEach(el -> el.execute(latch::countDown)); - latch.await(30, TimeUnit.SECONDS); - assertThat(latch.await(30, TimeUnit.SECONDS)).as("event loop idleness checker task failed").isTrue(); + private void awaitForHttpClientCompletion(CountDownLatch latch) throws InterruptedException { + // Wait for HttpClizent response to be complete + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + + // since Netty PR #9489, channel listeners execution are not synchronous anymore and are rescheduled in event loop queues. + // So, to prevent situation where an HttpClient response is available while any server pending listeners are not yet executed, + // ensure for all event loop idleness. This is necessary, else we may start to test metrics while they are not up-to-date + CountDownLatch idleEventLoopslatch = new CountDownLatch(eventLoops.size()); + eventLoops.forEach(el -> el.execute(idleEventLoopslatch::countDown)); + idleEventLoopslatch.await(30, TimeUnit.SECONDS); + assertThat(idleEventLoopslatch.await(30, TimeUnit.SECONDS)).as("event loop idleness checker task failed").isTrue(); } private void checkServerConnectionsMicrometer(HttpServerRequest request) { @@ -891,6 +881,7 @@ HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslCo } HttpClient customizeClientOptions(HttpClient httpClient, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) { + httpClient.doOnConnected(connection -> eventLoops.add(connection.channel().executor())); return ctx == null ? httpClient.protocol(protocols) : httpClient.protocol(protocols).secure(spec -> spec.sslContext(ctx)); } From 13b152abeae33b5514028cec563f945f1cfed698 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 30 Jun 2022 12:20:48 +0200 Subject: [PATCH 14/14] spelling mistakes --- .../test/java/reactor/netty/http/HttpMetricsHandlerTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 09ac0d21f7..33ca81f47c 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -743,12 +743,12 @@ private ConnectionObserver observeDisconnect(AtomicReference lat } private void awaitForHttpClientCompletion(CountDownLatch latch) throws InterruptedException { - // Wait for HttpClizent response to be complete + // Wait for HttpClient response to be complete assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); // since Netty PR #9489, channel listeners execution are not synchronous anymore and are rescheduled in event loop queues. // So, to prevent situation where an HttpClient response is available while any server pending listeners are not yet executed, - // ensure for all event loop idleness. This is necessary, else we may start to test metrics while they are not up-to-date + // we need to ensure that all event loop are idle. This is necessary, else we may start to test metrics while they are not up-to-date CountDownLatch idleEventLoopslatch = new CountDownLatch(eventLoops.size()); eventLoops.forEach(el -> el.execute(idleEventLoopslatch::countDown)); idleEventLoopslatch.await(30, TimeUnit.SECONDS);