Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2187 #2237

Merged
merged 10 commits into from Jun 24, 2022
Expand Up @@ -22,6 +22,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.DatagramPacket;
import reactor.netty.NettyPipeline;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
Expand All @@ -34,6 +36,8 @@
*/
public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler {

private static final Logger log = Loggers.getLogger(AbstractChannelMetricsHandler.class);

final SocketAddress remoteAddress;

final boolean onServer;
Expand All @@ -46,15 +50,27 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b
@Override
public void channelActive(ChannelHandlerContext ctx) {
if (onServer) {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
try {
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (onServer) {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
try {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ctx.fireChannelInactive();
}
Expand All @@ -79,47 +95,65 @@ public void channelRegistered(ChannelHandlerContext ctx) {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress, buffer.readableBytes());
try {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress, buffer.readableBytes());
}
}
}
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes());
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordRead(ctx, remoteAddress != null ? remoteAddress : p.sender(), buffer.readableBytes());
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireChannelRead(msg);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress, buffer.readableBytes());
try {
if (msg instanceof ByteBuf) {
ByteBuf buffer = (ByteBuf) msg;
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress, buffer.readableBytes());
}
}
}
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes());
else if (msg instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) msg;
ByteBuf buffer = p.content();
if (buffer.readableBytes() > 0) {
recordWrite(ctx, remoteAddress != null ? remoteAddress : p.recipient(), buffer.readableBytes());
}
}
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

//"FutureReturnValueIgnored" this is deliberate
ctx.write(msg, promise);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
try {
recordException(ctx, remoteAddress != null ? remoteAddress : ctx.channel().remoteAddress());
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}

ctx.fireExceptionCaught(cause);
}
Expand Down
Expand Up @@ -22,6 +22,8 @@
import io.netty.util.concurrent.Promise;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.internal.util.MapUtils;
import reactor.util.Logger;
import reactor.util.Loggers;

import java.net.SocketAddress;
import java.time.Duration;
Expand All @@ -39,6 +41,8 @@
*/
final class AddressResolverGroupMetrics<T extends SocketAddress> extends AddressResolverGroup<T> {

private static final Logger log = Loggers.getLogger(AddressResolverGroupMetrics.class);

static final ConcurrentMap<Integer, AddressResolverGroupMetrics<?>> cache = new ConcurrentHashMap<>();

static AddressResolverGroupMetrics<?> getOrCreate(
Expand Down Expand Up @@ -116,10 +120,16 @@ Future<List<T>> resolveAllInternal(SocketAddress address, Supplier<Future<List<T
}

void record(long resolveTimeStart, String status, SocketAddress remoteAddress) {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
try {
recorder.recordResolveAddressTime(
remoteAddress,
Duration.ofNanos(System.nanoTime() - resolveTimeStart),
status);
}
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
};
}
Expand Down
Expand Up @@ -376,15 +376,21 @@ protected void initChannel(Channel channel) {
ChannelOperations.addMetricsHandler(channel, config.metricsRecorder, remoteAddress, onServer);

if (Metrics.isInstrumentationAvailable()) {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
try {
ByteBufAllocator alloc = channel.alloc();
if (alloc instanceof PooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("pooled", ((PooledByteBufAllocator) alloc).metric(), alloc);
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.eventLoop());
}
else if (alloc instanceof UnpooledByteBufAllocator) {
ByteBufAllocatorMetrics.INSTANCE.registerMetrics("unpooled", ((UnpooledByteBufAllocator) alloc).metric(), alloc);
catch (RuntimeException e) {
log.warn("Exception caught while recording metrics.", e);
// Allow request-response exchange to continue, unaffected by metrics problem
violetagg marked this conversation as resolved.
Show resolved Hide resolved
}

MicrometerEventLoopMeterRegistrar.INSTANCE.registerMetrics(channel.eventLoop());
}
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@
package reactor.netty.transport;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
Expand Down Expand Up @@ -115,6 +117,52 @@ void testEventLoopMetrics() throws InterruptedException {
}
}

// https://github.com/reactor/reactor-netty/issues/2187
@Test
void testEventLoopMetricsFailure() throws InterruptedException {
registry.config().meterFilter(new MeterFilter() {
@Override
public Meter.Id map(Meter.Id id) {
violetagg marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("Test injected Exception");
}
});

final CountDownLatch latch = new CountDownLatch(1);
DisposableServer server = null;
Connection client = null;
LoopResources loop = null;

try {
loop = LoopResources.create(TransportEventLoopMetricsTest.class.getName(), 3, true);
server = TcpServer.create()
.port(0)
.metrics(true)
.runOn(loop)
.doOnConnection(c -> latch.countDown())
.bindNow();

assertThat(server).isNotNull();
client = TcpClient.create()
.port(server.port())
.connectNow();

assertThat(client).isNotNull();
assertThat(latch.await(5, TimeUnit.SECONDS)).as("Failed to connect").isTrue();
}

finally {
if (client != null) {
client.disposeNow();
}
if (server != null) {
server.disposeNow();
}
if (loop != null) {
loop.disposeLater().block(Duration.ofSeconds(10));
}
}
}

private double getGaugeValue(String name, String... tags) {
Gauge gauge = registry.find(name).tags(tags).gauge();
double result = -1;
Expand Down