Skip to content

Commit

Permalink
Ensure errors caused by recording metrics do not affect the operation…
Browse files Browse the repository at this point in the history
…al code (#2237)

Fixes #2187

Co-authored-by: Violeta Georgieva <violetag@vmware.com>
  • Loading branch information
ChristianLMI and violetagg committed Jun 24, 2022
1 parent 3a9fcc0 commit 86cb1db
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 130 deletions.
Expand Up @@ -22,6 +22,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import reactor.netty.NettyPipeline;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
Expand All @@ -34,6 +36,8 @@
*/
public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler {

private static final Logger log = Loggers.getLogger(AbstractChannelMetricsHandler.class);

final SocketAddress remoteAddress;

final boolean onServer;
Expand All @@ -46,15 +50,27 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b
@Override
public void channelActive(ChannelHandlerContext ctx) {
if (onServer) {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
try {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (onServer) {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
try {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelInactive();
}
Expand All @@ -79,47 +95,65 @@ public void channelRegistered(ChannelHandlerContext ctx) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress, buffer.readableBytes());
try {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress, buffer.readableBytes());
}
}
}
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes());
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes());
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelRead(msg);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress, buffer.readableBytes());
try {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress, buffer.readableBytes());
}
}
}
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes());
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes());
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
try {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireExceptionCaught(cause);
}
Expand Down
Expand Up @@ -22,6 +22,8 @@
import io.netty.util.concurrent.Promise;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.internal.util.MapUtils;
import reactor.util.Logger;
import reactor.util.Loggers;

import java.net.SocketAddress;
import java.time.Duration;
Expand All @@ -39,6 +41,8 @@
*/
final class AddressResolverGroupMetrics<T extends SocketAddress> extends AddressResolverGroup<T> {

private static final Logger log = Loggers.getLogger(AddressResolverGroupMetrics.class);

static final ConcurrentMap<Integer, AddressResolverGroupMetrics<?>> cache = new ConcurrentHashMap<>();

static AddressResolverGroupMetrics<?> getOrCreate(
Expand Down Expand Up @@ -116,10 +120,16 @@ Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T
}

void record(long resolveTimeStart, String status, SocketAddress remoteAddress) {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
try {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
};
}
Expand Down
Expand Up @@ -376,15 +376,21 @@ protected void initChannel(Channel channel) {
ChannelOperations.addMetricsHandler(channel, config.metricsRecorder, remoteAddress, onServer);

if (Metrics.isInstrumentationAvailable()) {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
try {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.eventLoop());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.eventLoop());
}
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@
package reactor.netty.transport;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
Expand Down Expand Up @@ -115,6 +117,52 @@ void testEventLoopMetrics() throws InterruptedException {
}
}

// https://github.com/reactor/reactor-netty/issues/2187
@Test
void testEventLoopMetricsFailure() throws InterruptedException {
registry.config().meterFilter(new MeterFilter() {
@Override
public Meter.Id map(Meter.Id id) {
throw new IllegalArgumentException("Test injected Exception");
}
});

final CountDownLatch latch = new CountDownLatch(1);
DisposableServer server = null;
Connection client = null;
LoopResources loop = null;

try {
loop = LoopResources.create(TransportEventLoopMetricsTest.class.getName(), 3, true);
server = TcpServer.create()
.port(0)
.metrics(true)
.runOn(loop)
.doOnConnection(c -> latch.countDown())
.bindNow();

assertThat(server).isNotNull();
client = TcpClient.create()
.port(server.port())
.connectNow();

assertThat(client).isNotNull();
assertThat(latch.await(5, TimeUnit.SECONDS)).as("Failed to connect").isTrue();
}

finally {
if (client != null) {
client.disposeNow();
}
if (server != null) {
server.disposeNow();
}
if (loop != null) {
loop.disposeLater().block(Duration.ofSeconds(10));
}
}
}

private double getGaugeValue(String name, String... tags) {
Gauge gauge = registry.find(name).tags(tags).gauge();
double result = -1;
Expand Down

0 comments on commit 86cb1db

Please sign in to comment.