Skip to content

Commit

Permalink
Added additional error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristianLMI committed Jun 21, 2022
1 parent 2ccd6ea commit b4f2231
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 60 deletions.
Expand Up @@ -80,23 +80,29 @@ protected AbstractHttpClientMetricsHandler(AbstractHttpClientMetricsHandler copy
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpRequest) {
extractDetailsFromHttpRequest(ctx, (HttpRequest) msg);
}
try {
if (msg instanceof HttpRequest) {
extractDetailsFromHttpRequest(ctx, (HttpRequest) msg);
}

dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
SocketAddress address = ctx.channel().remoteAddress();
promise.addListener(future -> {
try {
recordWrite(address);
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
});
dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
SocketAddress address = ctx.channel().remoteAddress();
promise.addListener(future -> {
try {
recordWrite(address);
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
});
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
Expand Down
Expand Up @@ -71,61 +71,78 @@ public void channelActive(ChannelHandlerContext ctx) {
// by the ChannelMetricsHandler itself. ChannelMetricsHandler is only present when the recorder is
// not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class.
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
try {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
try {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelInactive();
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof HttpResponse) {
if (((HttpResponse) msg).status().equals(HttpResponseStatus.CONTINUE)) {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
return;
try {
if (msg instanceof HttpResponse) {
if (((HttpResponse) msg).status().equals(HttpResponseStatus.CONTINUE)) {
return;
}

dataSentTime = System.nanoTime();
}

dataSentTime = System.nanoTime();
}
dataSent += extractProcessedDataFromBuffer(msg);

dataSent += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
promise.addListener(future -> {
try {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
ops.method().name(), ops.status().codeAsText().toString());
if (!ops.isHttp2() && ops.hostAddress() != null) {
// This metric is not applicable for HTTP/2
// ops.hostAddress() == null when request decoding failed, in this case
// we do not report active connection, so we do not report inactive connection
recordInactiveConnection(ops);
if (msg instanceof LastHttpContent) {
promise.addListener(future -> {
try {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
ops.method().name(), ops.status().codeAsText().toString());
if (!ops.isHttp2() && ops.hostAddress() != null) {
// This metric is not applicable for HTTP/2
// ops.hostAddress() == null when request decoding failed, in this case
// we do not report active connection, so we do not report inactive connection
recordInactiveConnection(ops);
}
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

dataSent = 0;
});
dataSent = 0;
});
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
finally {
//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

@Override
Expand Down
Expand Up @@ -237,17 +237,14 @@ else if (clientProtocols.length == 2 &&
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) {
disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.metrics(true, id -> {
throw new IllegalArgumentException("Testcase injected Exception");
})
.bindNow();

AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) ->
serverAddress.set(conn.channel().remoteAddress())
);
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols);

StepVerifier.create(httpClient.post()
.uri("/1")
Expand All @@ -264,14 +261,11 @@ void testRecordingFailsServerSide(HttpProtocol[] serverProtocols, HttpProtocol[]
@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testRecordingFailsClientSide(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) {
disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.bindNow();

AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) ->
serverAddress.set(conn.channel().remoteAddress())
).metrics(true, id -> {
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).metrics(true, id -> {
throw new IllegalArgumentException("Testcase injected Exception");
});

Expand Down

0 comments on commit b4f2231

Please sign in to comment.