From 86cb1db66a8737da6cdea146740d8171521bcf49 Mon Sep 17 00:00:00 2001 From: ChristianLMI <75296563+ChristianLMI@users.noreply.github.com> Date: Fri, 24 Jun 2022 11:22:55 +0200 Subject: [PATCH] Ensure errors caused by recording metrics do not affect the operational code (#2237) Fixes #2187 Co-authored-by: Violeta Georgieva --- .../AbstractChannelMetricsHandler.java | 80 ++++++--- .../AddressResolverGroupMetrics.java | 18 +- .../netty/transport/TransportConfig.java | 20 ++- .../TransportEventLoopMetricsTest.java | 50 +++++- .../AbstractHttpClientMetricsHandler.java | 104 ++++++++---- .../AbstractHttpServerMetricsHandler.java | 157 +++++++++++------- .../netty/http/HttpMetricsHandlerTests.java | 47 ++++++ 7 files changed, 346 insertions(+), 130 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java index 7be7229c52..13213cc432 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java @@ -22,6 +22,8 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.socket.DatagramPacket; import reactor.netty.NettyPipeline; +import reactor.util.Logger; +import reactor.util.Loggers; import reactor.util.annotation.Nullable; import java.net.SocketAddress; @@ -34,6 +36,8 @@ */ public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler { + private static final Logger log = Loggers.getLogger(AbstractChannelMetricsHandler.class); + final SocketAddress remoteAddress; final boolean onServer; @@ -46,7 +50,13 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b @Override public void channelActive(ChannelHandlerContext ctx) { if (onServer) { - recorder().recordServerConnectionOpened(ctx.channel().localAddress()); + try { + recorder().recordServerConnectionOpened(ctx.channel().localAddress()); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } } ctx.fireChannelActive(); } @@ -54,7 +64,13 @@ public void channelActive(ChannelHandlerContext ctx) { @Override public void channelInactive(ChannelHandlerContext ctx) { if (onServer) { - recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + try { + recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } } ctx.fireChannelInactive(); } @@ -79,19 +95,25 @@ public void channelRegistered(ChannelHandlerContext ctx) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; - if (buffer.readableBytes() > 0) { - recordRead(ctx, remoteAddress, buffer.readableBytes()); + try { + if (msg instanceof ByteBuf) { + ByteBuf buffer = (ByteBuf) msg; + if (buffer.readableBytes() > 0) { + recordRead(ctx, remoteAddress, buffer.readableBytes()); + } } - } - else if (msg instanceof DatagramPacket) { - DatagramPacket p = (DatagramPacket) msg; - ByteBuf buffer = p.content(); - if (buffer.readableBytes() > 0) { - recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes()); + else if (msg instanceof DatagramPacket) { + DatagramPacket p = (DatagramPacket) msg; + ByteBuf buffer = p.content(); + if (buffer.readableBytes() > 0) { + recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes()); + } } } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } ctx.fireChannelRead(msg); } @@ -99,19 +121,25 @@ else if (msg instanceof DatagramPacket) { @Override @SuppressWarnings("FutureReturnValueIgnored") public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof ByteBuf) { - ByteBuf buffer = (ByteBuf) msg; - if (buffer.readableBytes() > 0) { - recordWrite(ctx, remoteAddress, buffer.readableBytes()); + try { + if (msg instanceof ByteBuf) { + ByteBuf buffer = (ByteBuf) msg; + if (buffer.readableBytes() > 0) { + recordWrite(ctx, remoteAddress, buffer.readableBytes()); + } } - } - else if (msg instanceof DatagramPacket) { - DatagramPacket p = (DatagramPacket) msg; - ByteBuf buffer = p.content(); - if (buffer.readableBytes() > 0) { - recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes()); + else if (msg instanceof DatagramPacket) { + DatagramPacket p = (DatagramPacket) msg; + ByteBuf buffer = p.content(); + if (buffer.readableBytes() > 0) { + recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes()); + } } } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } //"FutureReturnValueIgnored" this is deliberate ctx.write(msg, promise); @@ -119,7 +147,13 @@ else if (msg instanceof DatagramPacket) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress()); + try { + recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress()); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } ctx.fireExceptionCaught(cause); } diff --git a/reactor-netty-core/src/main/java/reactor/netty/transport/AddressResolverGroupMetrics.java b/reactor-netty-core/src/main/java/reactor/netty/transport/AddressResolverGroupMetrics.java index 8445a98f98..efd1f2f09a 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/transport/AddressResolverGroupMetrics.java +++ b/reactor-netty-core/src/main/java/reactor/netty/transport/AddressResolverGroupMetrics.java @@ -22,6 +22,8 @@ import io.netty.util.concurrent.Promise; import reactor.netty.channel.ChannelMetricsRecorder; import reactor.netty.internal.util.MapUtils; +import reactor.util.Logger; +import reactor.util.Loggers; import java.net.SocketAddress; import java.time.Duration; @@ -39,6 +41,8 @@ */ final class AddressResolverGroupMetrics extends AddressResolverGroup { + private static final Logger log = Loggers.getLogger(AddressResolverGroupMetrics.class); + static final ConcurrentMap> cache = new ConcurrentHashMap<>(); static AddressResolverGroupMetrics getOrCreate( @@ -116,10 +120,16 @@ Future> resolveAllInternal(SocketAddress address, Supplier latch.countDown()) + .bindNow(); + + assertThat(server).isNotNull(); + client = TcpClient.create() + .port(server.port()) + .connectNow(); + + assertThat(client).isNotNull(); + assertThat(latch.await(5, TimeUnit.SECONDS)).as("Failed to connect").isTrue(); + } + + finally { + if (client != null) { + client.disposeNow(); + } + if (server != null) { + server.disposeNow(); + } + if (loop != null) { + loop.disposeLater().block(Duration.ofSeconds(10)); + } + } + } + private double getGaugeValue(String name, String... tags) { Gauge gauge = registry.find(name).tags(tags).gauge(); double result = -1; diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java index f74229e87a..19e0b0ca94 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/AbstractHttpClientMetricsHandler.java @@ -24,6 +24,8 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; import reactor.netty.channel.ChannelOperations; +import reactor.util.Logger; +import reactor.util.Loggers; import reactor.util.annotation.Nullable; import reactor.util.context.ContextView; @@ -37,6 +39,8 @@ */ abstract class AbstractHttpClientMetricsHandler extends ChannelDuplexHandler { + private static final Logger log = Loggers.getLogger(AbstractHttpClientMetricsHandler.class); + String path; String method; @@ -76,29 +80,29 @@ protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy @Override @SuppressWarnings("FutureReturnValueIgnored") public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof HttpRequest) { - method = ((HttpRequest) msg).method().name(); - - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); - if (channelOps instanceof HttpClientOperations) { - HttpClientOperations ops = (HttpClientOperations) channelOps; - path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path); - contextView = ops.currentContextView(); + try { + if (msg instanceof HttpRequest) { + extractDetailsFromHttpRequest(ctx, (HttpRequest) msg); } - dataSentTime = System.nanoTime(); - } - - if (msg instanceof ByteBufHolder) { - dataSent += ((ByteBufHolder) msg).content().readableBytes(); - } - else if (msg instanceof ByteBuf) { - dataSent += ((ByteBuf) msg).readableBytes(); + dataSent += extractProcessedDataFromBuffer(msg); + + if (msg instanceof LastHttpContent) { + SocketAddress address = ctx.channel().remoteAddress(); + promise.addListener(future -> { + try { + recordWrite(address); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } + }); + } } - - if (msg instanceof LastHttpContent) { - SocketAddress address = ctx.channel().remoteAddress(); - promise.addListener(future -> recordWrite(address)); + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem } //"FutureReturnValueIgnored" this is deliberate ctx.write(msg, promise); @@ -106,34 +110,62 @@ else if (msg instanceof ByteBuf) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof HttpResponse) { - status = ((HttpResponse) msg).status().codeAsText().toString(); + try { + if (msg instanceof HttpResponse) { + status = ((HttpResponse) msg).status().codeAsText().toString(); - dataReceivedTime = System.nanoTime(); - } + dataReceivedTime = System.nanoTime(); + } - if (msg instanceof ByteBufHolder) { - dataReceived += ((ByteBufHolder) msg).content().readableBytes(); - } - else if (msg instanceof ByteBuf) { - dataReceived += ((ByteBuf) msg).readableBytes(); - } + dataReceived += extractProcessedDataFromBuffer(msg); - if (msg instanceof LastHttpContent) { - recordRead(ctx.channel().remoteAddress()); - reset(); + if (msg instanceof LastHttpContent) { + recordRead(ctx.channel().remoteAddress()); + reset(); + } + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem } - ctx.fireChannelRead(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - recordException(ctx); - + try { + recordException(ctx); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } ctx.fireExceptionCaught(cause); } + private void extractDetailsFromHttpRequest(ChannelHandlerContext ctx, HttpRequest request) { + method = request.method().name(); + + ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); + if (channelOps instanceof HttpClientOperations) { + HttpClientOperations ops = (HttpClientOperations) channelOps; + path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path); + contextView = ops.currentContextView(); + } + + dataSentTime = System.nanoTime(); + } + + private long extractProcessedDataFromBuffer(Object msg) { + if (msg instanceof ByteBufHolder) { + return ((ByteBufHolder) msg).content().readableBytes(); + } + else if (msg instanceof ByteBuf) { + return ((ByteBuf) msg).readableBytes(); + } + return 0; + } + protected abstract HttpClientMetricsRecorder recorder(); protected void recordException(ChannelHandlerContext ctx) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 8427a15e80..316f0dd033 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -26,6 +26,8 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http2.Http2StreamChannel; import reactor.netty.channel.ChannelOperations; +import reactor.util.Logger; +import reactor.util.Loggers; import reactor.util.annotation.Nullable; import java.time.Duration; @@ -37,6 +39,8 @@ */ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler { + private static final Logger log = Loggers.getLogger(AbstractHttpServerMetricsHandler.class); + long dataReceived; long dataSent; @@ -67,7 +71,13 @@ public void channelActive(ChannelHandlerContext ctx) { // by the ChannelMetricsHandler itself. ChannelMetricsHandler is only present when the recorder is // not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class. if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) { - recorder().recordServerConnectionOpened(ctx.channel().localAddress()); + try { + recorder().recordServerConnectionOpened(ctx.channel().localAddress()); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } } ctx.fireChannelActive(); } @@ -75,7 +85,13 @@ public void channelActive(ChannelHandlerContext ctx) { @Override public void channelInactive(ChannelHandlerContext ctx) { if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) { - recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + try { + recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } } ctx.fireChannelInactive(); } @@ -83,75 +99,82 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override @SuppressWarnings("FutureReturnValueIgnored") public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg instanceof HttpResponse) { - if (((HttpResponse) msg).status().equals(HttpResponseStatus.CONTINUE)) { - //"FutureReturnValueIgnored" this is deliberate - ctx.write(msg, promise); - return; + try { + if (msg instanceof HttpResponse) { + if (((HttpResponse) msg).status().equals(HttpResponseStatus.CONTINUE)) { + return; + } + + dataSentTime = System.nanoTime(); } - dataSentTime = System.nanoTime(); - } + dataSent += extractProcessedDataFromBuffer(msg); + + if (msg instanceof LastHttpContent) { + promise.addListener(future -> { + try { + ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); + if (channelOps instanceof HttpServerOperations) { + HttpServerOperations ops = (HttpServerOperations) channelOps; + recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), + ops.method().name(), ops.status().codeAsText().toString()); + if (!ops.isHttp2() && ops.hostAddress() != null) { + // This metric is not applicable for HTTP/2 + // ops.hostAddress() == null when request decoding failed, in this case + // we do not report active connection, so we do not report inactive connection + recordInactiveConnection(ops); + } + } + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem + } - if (msg instanceof ByteBufHolder) { - dataSent += ((ByteBufHolder) msg).content().readableBytes(); + dataSent = 0; + }); + } } - else if (msg instanceof ByteBuf) { - dataSent += ((ByteBuf) msg).readableBytes(); + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem } + finally { + //"FutureReturnValueIgnored" this is deliberate + ctx.write(msg, promise); + } + } - if (msg instanceof LastHttpContent) { - promise.addListener(future -> { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + if (msg instanceof HttpRequest) { + dataReceivedTime = System.nanoTime(); ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); if (channelOps instanceof HttpServerOperations) { HttpServerOperations ops = (HttpServerOperations) channelOps; - recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), - ops.method().name(), ops.status().codeAsText().toString()); - if (!ops.isHttp2() && ops.hostAddress() != null) { + if (!ops.isHttp2()) { // This metric is not applicable for HTTP/2 - // ops.hostAddress() == null when request decoding failed, in this case - // we do not report active connection, so we do not report inactive connection - recordInactiveConnection(ops); + recordActiveConnection(ops); } } + } - dataSent = 0; - }); - } + dataReceived += extractProcessedDataFromBuffer(msg); - //"FutureReturnValueIgnored" this is deliberate - ctx.write(msg, promise); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof HttpRequest) { - dataReceivedTime = System.nanoTime(); - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); - if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; - if (!ops.isHttp2()) { - // This metric is not applicable for HTTP/2 - recordActiveConnection(ops); + if (msg instanceof LastHttpContent) { + ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); + if (channelOps instanceof HttpServerOperations) { + HttpServerOperations ops = (HttpServerOperations) channelOps; + recordRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), ops.method().name()); } - } - } - if (msg instanceof ByteBufHolder) { - dataReceived += ((ByteBufHolder) msg).content().readableBytes(); - } - else if (msg instanceof ByteBuf) { - dataReceived += ((ByteBuf) msg).readableBytes(); - } - - if (msg instanceof LastHttpContent) { - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); - if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; - recordRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), ops.method().name()); + dataReceived = 0; } - - dataReceived = 0; + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem } ctx.fireChannelRead(msg); @@ -159,11 +182,17 @@ else if (msg instanceof ByteBuf) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); - if (channelOps instanceof HttpServerOperations) { - HttpServerOperations ops = (HttpServerOperations) channelOps; - // Always take the remote address from the operations in order to consider proxy information - recordException(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path)); + try { + ChannelOperations channelOps = ChannelOperations.get(ctx.channel()); + if (channelOps instanceof HttpServerOperations) { + HttpServerOperations ops = (HttpServerOperations) channelOps; + // Always take the remote address from the operations in order to consider proxy information + recordException(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path)); + } + } + catch (RuntimeException e) { + log.warn("Exception caught while recording metrics.", e); + // Allow request-response exchange to continue, unaffected by metrics problem } ctx.fireExceptionCaught(cause); @@ -171,6 +200,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { protected abstract HttpServerMetricsRecorder recorder(); + private long extractProcessedDataFromBuffer(Object msg) { + if (msg instanceof ByteBufHolder) { + return ((ByteBufHolder) msg).content().readableBytes(); + } + else if (msg instanceof ByteBuf) { + return ((ByteBuf) msg).readableBytes(); + } + return 0; + } + protected void recordException(HttpServerOperations ops, String path) { // Always take the remote address from the operations in order to consider proxy information recorder().incrementErrorsCount(ops.remoteAddress(), path); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 5409cbce0b..c69c075df9 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -233,6 +233,53 @@ else if (clientProtocols.length == 2 && numWrites[1], bytesWrite[1]); } + // https://github.com/reactor/reactor-netty/issues/2187 + @ParameterizedTest + @MethodSource("httpCompatibleProtocols") + void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + .metrics(true, id -> { + throw new IllegalArgumentException("Testcase injected Exception"); + }) + .bindNow(); + + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols); + + StepVerifier.create(httpClient.post() + .uri("/1") + .send(body) + .responseContent() + .aggregate() + .asString()) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(2)); + } + + // https://github.com/reactor/reactor-netty/issues/2187 + @ParameterizedTest + @MethodSource("httpCompatibleProtocols") + void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) { + disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols) + .bindNow(); + + httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).metrics(true, id -> { + throw new IllegalArgumentException("Testcase injected Exception"); + }); + + StepVerifier.create(httpClient.post() + .uri("/1") + .send(body) + .responseContent() + .aggregate() + .asString()) + .expectNext("Hello World!") + .expectComplete() + .verify(Duration.ofSeconds(2)); + } + @ParameterizedTest @MethodSource("httpCompatibleProtocols") void testNonExistingEndpoint(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,